http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java deleted file mode 100644 index 189e2a5..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordination.model.internal; - -import java.util.HashSet; -import java.util.Set; - -/** - * @since Mar 24, 2016 Logically one unit topology consists of S spouts, G - * groupby bolts, A alertBolts normally S=1 Physically each spout is - * composed of s spout nodes, each groupby bolt is composed of g groupby - * nodes, and each alert bolt is composed of a alert nodes - */ -public class Topology { - - private String name; - // number of logical nodes - private int numOfSpout; - private int numOfAlertBolt; - private int numOfGroupBolt; - private int numOfPublishBolt; - private String spoutId; - private String pubBoltId; - @Deprecated - private Set<String> groupNodeIds; - @Deprecated - private Set<String> alertBoltIds; - - // number of physical nodes for each logic bolt - private int spoutParallelism = 1; - private int groupParallelism = 1; - private int alertParallelism = 1; - - private String clusterName; - - public Topology() { - } - - public Topology(String name, int group, int alert) { - this.name = name; - this.numOfSpout = 1; - this.numOfGroupBolt = group; - this.numOfAlertBolt = alert; - groupNodeIds = new HashSet<String>(group); - alertBoltIds = new HashSet<String>(alert); - - spoutParallelism = 1; - groupParallelism = 1; - alertParallelism = 1; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public int getNumOfSpout() { - return numOfSpout; - } - - public void setNumOfSpout(int numOfSpout) { - this.numOfSpout = numOfSpout; - } - - public int getNumOfAlertBolt() { - return numOfAlertBolt; - } - - public void setNumOfAlertBolt(int numOfAlertBolt) { - this.numOfAlertBolt = numOfAlertBolt; - } - - public int getNumOfGroupBolt() { - return numOfGroupBolt; - } - - public void setNumOfGroupBolt(int numOfGroupBolt) { - this.numOfGroupBolt = numOfGroupBolt; - } - - public String getSpoutId() { - return spoutId; - } - - public void setSpoutId(String spoutId) { - this.spoutId = spoutId; - } - - public String getPubBoltId() { - return pubBoltId; - } - - public void setPubBoltId(String pubBoltId) { - this.pubBoltId = pubBoltId; - } - - public Set<String> getGroupNodeIds() { - return groupNodeIds; - } - - public void setGroupNodeIds(Set<String> groupNodeIds) { - this.groupNodeIds = groupNodeIds; - } - - public Set<String> getAlertBoltIds() { - return alertBoltIds; - } - - public void setAlertBoltIds(Set<String> alertBoltIds) { - this.alertBoltIds = alertBoltIds; - } - - public int getNumOfPublishBolt() { - return numOfPublishBolt; - } - - public void setNumOfPublishBolt(int numOfPublishBolt) { - this.numOfPublishBolt = numOfPublishBolt; - } - - public int getSpoutParallelism() { - return spoutParallelism; - } - - public void setSpoutParallelism(int spoutParallelism) { - this.spoutParallelism = spoutParallelism; - } - - public int getGroupParallelism() { - return groupParallelism; - } - - public void setGroupParallelism(int groupParallelism) { - this.groupParallelism = groupParallelism; - } - - public int getAlertParallelism() { - return alertParallelism; - } - - public void setAlertParallelism(int alertParallelism) { - this.alertParallelism = alertParallelism; - } - - public String getClusterName() { - return clusterName; - } - - public void setClusterName(String clusterName) { - this.clusterName = clusterName; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IStreamCodec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IStreamCodec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IStreamCodec.java deleted file mode 100644 index 3272d28..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IStreamCodec.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.codec; - -import org.apache.eagle.alert.engine.model.StreamEvent; - -/** - * @since Apr 5, 2016 - * - */ -public interface IStreamCodec { - - StreamEvent decode(byte[] contents); - - byte[] encode(StreamEvent tuple); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockEventCodec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockEventCodec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockEventCodec.java deleted file mode 100644 index 49726e9..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockEventCodec.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.codec; - -import org.apache.eagle.alert.engine.model.StreamEvent; - -/** - * This is used for event codec. - * - * @since Apr 5, 2016 - * - */ -public class SherlockEventCodec implements IStreamCodec { - - @Override - public StreamEvent decode(byte[] contents) { - // TODO Auto-generated method stub - return null; - } - - @Override - public byte[] encode(StreamEvent tuple) { - // TODO Auto-generated method stub - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockMetricCodec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockMetricCodec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockMetricCodec.java deleted file mode 100644 index 0e875d8..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockMetricCodec.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.codec; - -import org.apache.eagle.alert.engine.model.StreamEvent; - -/** - * @since Apr 5, 2016 - * - */ -public class SherlockMetricCodec implements IStreamCodec { - - @Override - public StreamEvent decode(byte[] contents) { - // TODO Auto-generated method stub - return null; - } - - @Override - public byte[] encode(StreamEvent tuple) { - // TODO Auto-generated method stub - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java deleted file mode 100644 index f97eb2b..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.coordinator; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.builder.HashCodeBuilder; - -/** - * @since Apr 5, 2016 - * - */ -public class PolicyDefinition implements Serializable{ - private static final long serialVersionUID = 377581499339572414L; - // unique identifier - private String name; - private String description; - private List<String> inputStreams = new ArrayList<String>(); - private List<String> outputStreams = new ArrayList<String>(); - - private Definition definition; - - // one stream only have one partition in one policy, since we don't support stream alias - private List<StreamPartition> partitionSpec = new ArrayList<StreamPartition>(); - - // runtime configuration for policy, these are user-invisible - private int parallelismHint = 1; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public List<String> getInputStreams() { - return inputStreams; - } - - public void setInputStreams(List<String> inputStreams) { - this.inputStreams = inputStreams; - } - - public List<String> getOutputStreams() { - return outputStreams; - } - - public void setOutputStreams(List<String> outputStreams) { - this.outputStreams = outputStreams; - } - - public Definition getDefinition() { - return definition; - } - - public void setDefinition(Definition definition) { - this.definition = definition; - } - - public List<StreamPartition> getPartitionSpec() { - return partitionSpec; - } - - public void setPartitionSpec(List<StreamPartition> partitionSpec) { - this.partitionSpec = partitionSpec; - } - - public void addPartition(StreamPartition par) { - this.partitionSpec.add(par); - } - - public int getParallelismHint() { - return parallelismHint; - } - - public void setParallelismHint(int parallelism) { - this.parallelismHint = parallelism; - } - - @Override - public int hashCode() { - return new HashCodeBuilder(). - append(name). - append(description). - append(inputStreams). - append(outputStreams). - append(definition). - append(partitionSpec). -// append(parallelismHint). - build(); - } - - @Override - public boolean equals(Object that){ - if(that == this) - return true; - if(! (that instanceof PolicyDefinition)) - return false; - PolicyDefinition another = (PolicyDefinition)that; - if(another.name.equals(this.name) && - another.description.equals(this.description) && - CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams) && - CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams) && - another.definition.equals(this.definition) && - CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec) -// && another.parallelismHint == this.parallelismHint - ) { - return true; - } - return false; - } - - public static class Definition implements Serializable{ - private static final long serialVersionUID = -622366527887848346L; - - public String type; - public String value; - - public Definition(String type,String value){ - this.type = type; - this.value = value; - } - - public Definition() { - this.type = null; - this.value = null; - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(type).append(value).build(); - } - - @Override - public boolean equals(Object that){ - if(that == this) - return true; - if(!(that instanceof Definition)) - return false; - Definition another = (Definition)that; - if(another.type.equals(this.type) && - another.value.equals(this.value)) - return true; - return false; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getValue() { - return value; - } - - public void setValue(String value) { - this.value = value; - } - - @Override - public String toString() { - return String.format("{type=\"%s\",value=\"%s\"",type,value); - } - } - - @Override - public String toString() { - return String.format("{name=\"%s\",definition=%s}",this.getName(),this.getDefinition().toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java deleted file mode 100644 index d8b4f28..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.coordinator; - -import org.apache.commons.lang3.builder.HashCodeBuilder; - -import java.util.List; -import java.util.Map; -import java.util.Objects; - -/** - * @since Apr 11, 2016 - * - */ -public class Publishment { - - private String name; - private String type; - private List<String> policyIds; - private String dedupIntervalMin; - private Map<String, String> properties; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public List<String> getPolicyIds() { - return policyIds; - } - - public void setPolicyIds(List<String> policyIds) { - this.policyIds = policyIds; - } - - public String getDedupIntervalMin() { - return dedupIntervalMin; - } - - public void setDedupIntervalMin(String dedupIntervalMin) { - this.dedupIntervalMin = dedupIntervalMin; - } - - public Map<String, String> getProperties() { - return properties; - } - - public void setProperties(Map<String, String> properties) { - this.properties = properties; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof Publishment) { - Publishment p = (Publishment) obj; - return (Objects.equals(name, p.getName()) && - Objects.equals(type, p.getType()) && - Objects.equals(dedupIntervalMin, p.getDedupIntervalMin()) && - Objects.equals(policyIds, p.getPolicyIds()) && - properties.equals(p.getProperties())); - } - return false; - } - - @Override - public int hashCode() { - return new HashCodeBuilder() - .append(name) - .append(type) - .append(dedupIntervalMin) - .append(policyIds) - .append(properties) - .build(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java deleted file mode 100644 index f34b971..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.coordinator; - -import java.util.Objects; - -import org.apache.commons.lang3.builder.HashCodeBuilder; - -public class PublishmentType { - - private String type; - private String className; - private String description; - private String fields; - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getClassName(){ - return className; - } - public void setClassName(String className){ - this.className = className; - } - - public String getDescription(){ - return description; - } - public void setDescription(String description){ - this.description = description; - } - - public String getFields() { - return fields; - } - - public void setFields(String fields) { - this.fields = fields; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PublishmentType) { - PublishmentType p = (PublishmentType) obj; - return (Objects.equals(className, p.getClassName()) && - Objects.equals(type, p.type) && - Objects.equals(description, p.getDescription()) && - Objects.equals(fields, p.getFields())); - } - return false; - } - - @Override - public int hashCode() { - return new HashCodeBuilder() - .append(className) - .append(type) - .append(description) - .append(fields) - .build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java deleted file mode 100644 index dc44571..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.coordinator; - -import java.io.Serializable; - -import com.fasterxml.jackson.annotation.JsonCreator; - -public class StreamColumn implements Serializable{ - private static final long serialVersionUID = -5457861313624389106L; - private String name; - private Type type; - private Object defaultValue; - private boolean required; - private String description; - - public String toString(){ - return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s]", name, type, defaultValue, required); - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public Type getType() { - return type; - } - - public void setType(Type type) { - this.type = type; - } - - public Object getDefaultValue() { - return defaultValue; - } - - public void setDefaultValue(Object defaultValue) { - this.defaultValue = defaultValue; - } - - public boolean isRequired() { - return required; - } - - public void setRequired(boolean required) { - this.required = required; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - - public enum Type implements Serializable{ - STRING("string"), INT("int"), LONG("long"), FLOAT("float"), DOUBLE("double"), BOOL("bool"), OBJECT("object"); - - private final String name; - - Type(String name){ - this.name = name; - } - - @Override - public String toString() { - return name; - } - - @JsonCreator - public static Type getEnumFromValue(String value) { - for (Type testEnum : values()) { - if (testEnum.name.equals(value)) { - return testEnum; - } - } - throw new IllegalArgumentException(); - } - } - - public static class Builder { - private StreamColumn column; - - public Builder(){ - column = new StreamColumn(); - } - public Builder name(String name){ - column.setName(name); - return this; - } - public Builder type(Type type){ - column.setType(type); - return this; - } - public Builder defaultValue(Object defaultValue){ - column.setDefaultValue(defaultValue); - return this; - } - public Builder required(boolean required){ - column.setRequired(required); - return this; - } - - public StreamColumn build(){ - return column; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java deleted file mode 100644 index cd5773a..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.coordinator; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -/** - * This is actually a data source schema. - * - * @since Apr 5, 2016 - * - */ -public class StreamDefinition implements Serializable{ - private static final long serialVersionUID = 2352202882328931825L; - private String streamId; - private String dataSource; - private String description; - private boolean validate; - private boolean timeseries; - - private List<StreamColumn> columns = new ArrayList<StreamColumn>(); - - public String toString(){ - return String.format("StreamDefinition[streamId=%s, dataSource=%s, description=%s, validate=%s, timeseries=%s, columns=%s", - streamId, - dataSource, - description, - validate, - timeseries, - columns); - } - - public String getStreamId() { - return streamId; - } - - public void setStreamId(String streamId) { - this.streamId = streamId; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public boolean isValidate() { - return validate; - } - - public void setValidate(boolean validate) { - this.validate = validate; - } - - public boolean isTimeseries() { - return timeseries; - } - - public void setTimeseries(boolean timeseries) { - this.timeseries = timeseries; - } - - public List<StreamColumn> getColumns() { - return columns; - } - - public void setColumns(List<StreamColumn> columns) { - this.columns = columns; - } - - public String getDataSource() { - return dataSource; - } - - public void setDataSource(String dataSource) { - this.dataSource = dataSource; - } - - public int getColumnIndex(String column){ - int i=0; - for(StreamColumn col:this.getColumns()){ - if(col.getName().equals(column)) return i; - i++; - } - return -1; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java deleted file mode 100644 index 7b96024..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.coordinator; - -import java.io.Serializable; -import java.util.*; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.builder.HashCodeBuilder; - -/** - * StreamPartition defines how a data stream is partitioned and sorted - * streamId is used for distinguishing different streams which are spawned from the same data source - * type defines how to partition data among slots within one slotqueue - * columns are fields based on which stream is grouped - * sortSpec defines how data is sorted - */ -public class StreamPartition implements Serializable { - private static final long serialVersionUID = -3361648309136926040L; - - private String streamId; - private Type type; - private List<String> columns = new ArrayList<>(); - private StreamSortSpec sortSpec; - - public StreamPartition() { - } - - public StreamPartition(StreamPartition o) { - this.streamId = o.streamId; - this.type = o.type; - this.columns = new ArrayList<String>(o.columns); - this.sortSpec = o.sortSpec == null ? null : new StreamSortSpec(o.sortSpec); - } - - @Override - public boolean equals(Object other) { - if (other == this) - return true; - if (!(other instanceof StreamPartition)) { - return false; - } - StreamPartition sp = (StreamPartition) other; - return Objects.equals(streamId, sp.streamId) && Objects.equals(type, sp.type) - && CollectionUtils.isEqualCollection(columns, sp.columns) && Objects.equals(sortSpec, sp.sortSpec); - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(streamId).append(type).append(columns).append(sortSpec).build(); - } - - public void setType(Type type) { - this.type = type; - } - - public Type getType(){ - return this.type; - } - - public enum Type{ - GLOBAL("GLOBAL",0), GROUPBY("GROUPBY",1), SHUFFLE("SHUFFLE",2); - private final String name; - private final int index; - Type(String name, int index){ - this.name = name; - this.index = index; - } - @Override - public String toString() { - return this.name; - } - public static Type locate(String type){ - Type _type = _NAME_TYPE.get(type.toUpperCase()); - if(_type == null) - throw new IllegalStateException("Illegal type name: "+type); - return _type; - } - - public static Type locate(int index){ - Type _type = _INDEX_TYPE.get(index); - if(_type == null) - throw new IllegalStateException("Illegal type index: "+index); - return _type; - } - - private static final Map<String,Type> _NAME_TYPE = new HashMap<>(); - private static final Map<Integer,Type> _INDEX_TYPE = new TreeMap<>(); - static { - _NAME_TYPE.put(GLOBAL.name,GLOBAL); - _NAME_TYPE.put(GROUPBY.name,GROUPBY); - _NAME_TYPE.put(SHUFFLE.name,SHUFFLE); - - _INDEX_TYPE.put(GLOBAL.index,GLOBAL); - _INDEX_TYPE.put(GROUPBY.index,GLOBAL); - _INDEX_TYPE.put(SHUFFLE.index,GLOBAL); - } - } - - public List<String> getColumns() { - return columns; - } - - public void setColumns(List<String> columns) { - this.columns = columns; - } - - public String getStreamId() { - return streamId; - } - - public void setStreamId(String streamId) { - this.streamId = streamId; - } - - public StreamSortSpec getSortSpec() { - return sortSpec; - } - - public void setSortSpec(StreamSortSpec sortSpec) { - this.sortSpec = sortSpec; - } - - @Override - public String toString() { - return String.format("StreamPartition[streamId=%s,type=%s,columns=[%s],sortSpec=[%s]]",this.getStreamId(),this.getType(), StringUtils.join(this.getColumns(),","), sortSpec); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java deleted file mode 100644 index ee20f81..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.coordinator; - - -import java.io.Serializable; - -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.eagle.alert.utils.TimePeriodUtils; -import org.joda.time.Period; - -/** - * streamId is the key - */ -public class StreamSortSpec implements Serializable{ - private static final long serialVersionUID = 3626506441441584937L; - private String windowPeriod=""; - private int windowMargin = 30 * 1000; // 30 seconds by default - - public StreamSortSpec() {} - - public StreamSortSpec(StreamSortSpec spec) { - this.windowPeriod = spec.windowPeriod; - this.windowMargin = spec.windowMargin; - } - - public String getWindowPeriod() { - return windowPeriod; - } - - public int getWindowPeriodMillis() { - if(windowPeriod!=null) { - return TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod)); - } else return 0; - } - - public void setWindowPeriod(String windowPeriod) { - this.windowPeriod = windowPeriod; - } - public void setWindowPeriodMillis(int windowPeriodMillis) { - this.windowPeriod = Period.millis(windowPeriodMillis).toString(); - } - - public void setWindowPeriod2(Period period) { - this.windowPeriod = period.toString(); - } - - - public int getWindowMargin() { - return windowMargin; - } - - public void setWindowMargin(int windowMargin) { - this.windowMargin = windowMargin; - } - - @Override - public int hashCode(){ - return new HashCodeBuilder(). - append(windowPeriod). - append(windowMargin).toHashCode(); - } - - @Override - public boolean equals(Object that){ - if(this == that) - return true; - if(!(that instanceof StreamSortSpec)){ - return false; - } - - StreamSortSpec another = (StreamSortSpec)that; - return - another.windowPeriod.equals(this.windowPeriod) && - another.windowMargin == this.windowMargin; - } - - @Override - public String toString(){ - return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]", - this.getWindowPeriod(), - this.getWindowMargin()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java deleted file mode 100644 index 6cafb16..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.coordinator; - -import java.util.Map; - -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - * @since Apr 5, 2016 - * - */ -public class StreamingCluster { - public static enum StreamingType { - STORM - } - - @JsonProperty - private String name; - @JsonProperty - private String zone; - @JsonProperty - private StreamingType type; - @JsonProperty - private String description; - /** - * key - nimbus for storm - */ - @JsonProperty - private Map<String, String> deployments; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getZone() { - return zone; - } - - public void setZone(String zone) { - this.zone = zone; - } - - public StreamingType getType() { - return type; - } - - public void setType(StreamingType type) { - this.type = type; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public Map<String, String> getDeployments() { - return deployments; - } - - public void setDeployments(Map<String, String> deployments) { - this.deployments = deployments; - } - - public static final String NIMBUS_HOST = "nimbusHost"; - public static final String NIMBUS_THRIFT_PORT = "nimbusThriftPort"; - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java deleted file mode 100644 index f36d3cb..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.model; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang3.StringUtils; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.utils.DateTimeUtil; - -/** - * streamId stands for alert type instead of source event streamId - */ -public class AlertStreamEvent extends StreamEvent { - private static final long serialVersionUID = 2392131134670106397L; - - // TODO: Keep policy name only instead of policy entity - private PolicyDefinition policy; - private StreamDefinition schema; - private String createdBy; - private long createdTime; - - public PolicyDefinition getPolicy() { - return policy; - } - - public void setPolicy(PolicyDefinition policy) { - this.policy = policy; - } - - - public String getPolicyId() { - return policy.getName(); - } - - @Override - public String toString() { - List<String> dataStrings = new ArrayList<>(this.getData().length); - for(Object obj: this.getData()){ - if(obj!=null) { - dataStrings.add(obj.toString()); - }else{ - dataStrings.add(null); - } - } - return String.format("AlertStreamEvent[stream=%S,timestamp=%s,data=[%s], policy=%s, createdBy=%s]", - this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), StringUtils.join(dataStrings,","),this.getPolicy().getName(),this.getCreatedBy()); - } - - public String getCreatedBy() { - return createdBy; - } - - public void setCreatedBy(String createdBy) { - this.createdBy = createdBy; - } - - public StreamDefinition getSchema() { - return schema; - } - - public void setSchema(StreamDefinition schema) { - this.schema = schema; - } - - public long getCreatedTime() { - return createdTime; - } - - public void setCreatedTime(long createdTime) { - this.createdTime = createdTime; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java deleted file mode 100644 index cfed3e2..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.model; - -import java.io.Serializable; -import java.util.Objects; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; - -import backtype.storm.tuple.Tuple; - -/** - * This is a critical data structure across spout, router bolt and alert bolt - * partition[StreamPartition] defines how one incoming data stream is partitioned, sorted - * partitionKey[long] is java hash value of groupby fields. The groupby fields are defined in StreamPartition - * event[StreamEvent] is actual data - */ -public class PartitionedEvent implements Serializable{ - private static final long serialVersionUID = -3840016190614238593L; - private StreamPartition partition; - private long partitionKey; - private StreamEvent event; - - /** - * Used for bolt-internal but not inter-bolts, - * will not pass across bolts - */ - private transient Tuple anchor; - - public PartitionedEvent(){ - this.event = null; - this.partition = null; - this.partitionKey = 0L; - } - - public PartitionedEvent(StreamEvent event, StreamPartition partition, int partitionKey) { - this.event = event; - this.partition = partition; - this.partitionKey = partitionKey; - } - - @Override - public boolean equals(Object obj) { - if(obj == this) return true; - if(obj == null) return false; - if(obj instanceof PartitionedEvent){ - PartitionedEvent another = (PartitionedEvent) obj; - return !(this.partitionKey != another.getPartitionKey() - || !Objects.equals(this.event, another.getEvent()) - || !Objects.equals(this.partition, another.getPartition()) - || !Objects.equals(this.anchor, another.anchor)); - } else { - return false; - } - } - - @Override - public int hashCode() { - return new HashCodeBuilder() - .append(partitionKey) - .append(event) - .append(partition) - .build(); - } - - public StreamEvent getEvent() { - return event; - } - - public void setEvent(StreamEvent event) { - this.event = event; - } - - public StreamPartition getPartition() { - return partition; - } - - public void setPartition(StreamPartition partition) { - this.partition = partition; - } - - public void setPartitionKey(long partitionKey){ - this.partitionKey = partitionKey; - } - - public long getPartitionKey(){ - return this.partitionKey; - } - - public String toString(){ - return String.format("PartitionedEvent[partition=%s,event=%s,key=%s", partition, event,partitionKey); - } - - public long getTimestamp() { - return (event != null) ? event.getTimestamp() : 0L; - } - - public String getStreamId(){ - return (event != null) ? event.getStreamId() : null; - } - - public Object[] getData(){ - return event!=null ? event.getData() : null; - } - - public boolean isSortRequired(){ - return isPartitionRequired() && this.getPartition().getSortSpec()!=null; - } - - public boolean isPartitionRequired(){ - return this.getPartition()!=null; - } - - public PartitionedEvent copy() { - PartitionedEvent copied = new PartitionedEvent(); - copied.setEvent(this.getEvent()); - copied.setPartition(this.partition); - copied.setPartitionKey(this.partitionKey); - return copied; - } - - public Tuple getAnchor() { - return anchor; - } - - public void setAnchor(Tuple anchor) { - this.anchor = anchor; - } - - public PartitionedEvent withAnchor(Tuple tuple){ - this.setAnchor(tuple); - return this; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java deleted file mode 100644 index 3e4e1df..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.model; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.utils.DateTimeUtil; - -/** - * @since Apr 5, 2016 - * - */ -public class StreamEvent implements Serializable { - private static final long serialVersionUID = 2765116509856609763L; - - private String streamId; - private Object[] data; - private long timestamp; - - public StreamEvent(){} - - public StreamEvent(String streamId,long timestamp,Object[] data){ - this.setStreamId(streamId); - this.setTimestamp(timestamp); - this.setData(data); - } - - public String getStreamId() { - return streamId; - } - - public void setStreamId(String streamId) { - this.streamId = streamId; - } - - public Object[] getData() { - return data; - } - - public void setData(Object[] data) { - this.data = data; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(streamId).append(timestamp).append(data).build(); - } - - @Override - public boolean equals(Object obj) { - if(obj == this) return true; - if(obj instanceof StreamEvent){ - StreamEvent another = (StreamEvent) obj; - return Objects.equals(this.streamId,another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data,another.data); - } - return false; - } - - @Override - public String toString() { - List<String> dataStrings = new ArrayList<>(); - if(this.getData() != null) { - for (Object obj : this.getData()) { - if (obj != null) { - dataStrings.add(obj.toString()); - } else { - dataStrings.add(null); - } - } - } - return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s]]",this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), StringUtils.join(dataStrings,",")); - } - - public static StreamEventBuilder Builder(){ - return new StreamEventBuilder(); - } - - /** - * @return cloned new event object - */ - public StreamEvent copy(){ - StreamEvent newEvent = new StreamEvent(); - newEvent.setTimestamp(this.getTimestamp()); - newEvent.setData(this.getData()); - newEvent.setStreamId(this.getStreamId()); - return newEvent; - } - - public void copyFrom(StreamEvent event){ - this.setTimestamp(event.getTimestamp()); - this.setData(event.getData()); - this.setStreamId(event.getStreamId()); - } - - /** - * @param column - * @param streamDefinition - * @return - */ - public Object[] getData(StreamDefinition streamDefinition,List<String> column) { - ArrayList<Object> result = new ArrayList<>(column.size()); - for (String colName : column) { - result.add(this.getData()[streamDefinition.getColumnIndex(colName)]); - } - return result.toArray(); - } - - public Object[] getData(StreamDefinition streamDefinition,String ... column) { - ArrayList<Object> result = new ArrayList<>(column.length); - for (String colName : column) { - result.add(this.getData()[streamDefinition.getColumnIndex(colName)]); - } - return result.toArray(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java deleted file mode 100644 index 136fd8b..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.model; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class StreamEventBuilder{ - private final static Logger LOG = LoggerFactory.getLogger(StreamEventBuilder.class); - - private StreamEvent instance; - private StreamDefinition streamDefinition; - public StreamEventBuilder(){ - instance = new StreamEvent(); - } - - public StreamEventBuilder schema(StreamDefinition streamDefinition){ - this.streamDefinition = streamDefinition; - if(instance.getStreamId() == null) instance.setStreamId(streamDefinition.getStreamId()); - return this; - } - - public StreamEventBuilder streamId(String streamId){ - instance.setStreamId(streamId); - return this; - } - - public StreamEventBuilder attributes(Map<String,Object> data, StreamDefinition streamDefinition){ - this.schema(streamDefinition); - List<StreamColumn> columnList = streamDefinition.getColumns(); - if(columnList!=null && columnList.size() > 0){ - List<Object> values = new ArrayList<>(columnList.size()); - for (StreamColumn column : columnList) { - values.add(data.getOrDefault(column.getName(),column.getDefaultValue())); - } - instance.setData(values.toArray()); - } else if(LOG.isDebugEnabled()){ - LOG.warn("All data [{}] are ignored as no columns defined in schema {}",data,streamDefinition); - } - return this; - } - - public StreamEventBuilder attributes(Map<String,Object> data){ - return attributes(data,this.streamDefinition); - } - - public StreamEventBuilder attributes(Object ... data){ - instance.setData(data); - return this; - } - - public StreamEventBuilder timestamep(long timestamp){ - instance.setTimestamp(timestamp); - return this; - } - - public StreamEvent build(){ - if(instance.getStreamId() == null){ - throw new IllegalArgumentException("streamId is null of event: " + instance); - } - return instance; - } - - public StreamEventBuilder copyFrom(StreamEvent event) { - this.instance.copyFrom(event); - return this; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java deleted file mode 100644 index 06a99f4..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.apache.eagle.alert.metric; - -import java.util.Map; - -import org.apache.eagle.alert.metric.sink.MetricSink; -import org.apache.eagle.alert.metric.source.MetricSource; - -import com.codahale.metrics.MetricRegistry; -import com.typesafe.config.Config; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public interface IMetricSystem { - - /** - * Initialize - */ - void start(); - - /** - * Schedule reporter - */ - void schedule(); - - /** - * Close and stop all resources and services - */ - void stop(); - - /** - * Manual report metric - */ - void report(); - - /** - * - * @param sink metric sink - */ - void register(MetricSink sink,Config config); - - /** - * - * @param source metric source - */ - void register(MetricSource source); - - void tags(Map<String,Object> metricTags); - - MetricRegistry registry(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java deleted file mode 100644 index b91c606..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.metric; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.eagle.alert.metric.sink.MetricSink; -import org.apache.eagle.alert.metric.sink.MetricSinkRepository; -import org.apache.eagle.alert.metric.source.MetricSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.codahale.metrics.MetricRegistry; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -public class MetricSystem implements IMetricSystem { - private final Config config; - private Map<MetricSink,Config> sinks = new HashMap<>(); -// private Map<String,MetricSource> sources = new HashMap<>(); - private MetricRegistry registry = new MetricRegistry(); - private boolean running; - private boolean initialized; - private final static Logger LOG = LoggerFactory.getLogger(MetricSystem.class); - private final Map<String, Object> metricTags = new HashMap<>(); - - public MetricSystem(Config config){ - this.config = config; - } - - public static MetricSystem load(Config config){ - MetricSystem instance = new MetricSystem(config); - instance.loadFromConfig(); - return instance; - } - - @Override - public void tags(Map<String,Object> metricTags){ - this.metricTags.putAll(metricTags); - } - - @Override - public void start() { - if(initialized) - throw new IllegalStateException("Attempting to initialize a MetricsSystem that is already intialized"); - sinks.forEach((sink,conf) -> sink.prepare(conf.withValue("tags",ConfigFactory.parseMap(metricTags).root()),registry)); - initialized = true; - } - - @Override - public void schedule() { - if(running){ - throw new IllegalStateException("Attempting to start a MetricsSystem that is already running"); - } - - sinks.keySet().forEach((sink)->sink.start(5, TimeUnit.SECONDS)); - running = true; - } - - public void loadFromConfig(){ - loadSinksFromConfig(); - } - - private void loadSinksFromConfig(){ - Config sinkCls = config.hasPath("metric.sink") ? config.getConfig("metric.sink") : null; - if(sinkCls == null){ - // do nothing - }else{ - for(String sinkType:sinkCls.root().unwrapped().keySet()){ - register(MetricSinkRepository.createSink(sinkType),config.getConfig("metric.sink."+sinkType)); - } - } - } - - @Override - public void stop() { - sinks.keySet().forEach(MetricSink::stop); - } - - @Override - public void report() { - sinks.keySet().forEach(MetricSink::report); - } - - @Override - public void register(MetricSink sink,Config config) { - LOG.debug("Register {}",sink); - sinks.put(sink,config); - } - - @Override - public void register(MetricSource source) { - registry().registerAll(source.registry()); - } - - @Override - public MetricRegistry registry() { - return registry; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java deleted file mode 100644 index b5e6c63..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.metric.entity; - -import java.util.Map; -import java.util.TreeMap; - -import org.apache.eagle.alert.utils.DateTimeUtil; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Snapshot; -import com.codahale.metrics.Timer; - -public class MetricEvent extends TreeMap<String,Object>{ - - private static final long serialVersionUID = 6862373651636342744L; - - public static Builder of(String name){ - return new Builder(name); - } - - /** - * TODO: Refactor according to ConsoleReporter - */ - public static class Builder{ - private final String name; - private MetricEvent instance; - public Builder(String name){ - this.instance = new MetricEvent(); - this.name = name; - } - - public Builder from(Counter value) { -// this.instance.put("type","counter"); - this.instance.put("count",value.getCount()); - return this; - } - - public MetricEvent build(){ - this.instance.put("name",name); - if(!this.instance.containsKey("timestamp")){ - this.instance.put("timestamp", DateTimeUtil.getCurrentTimestamp()); - } - return this.instance; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public Builder from(Gauge gauge) { - Object value = gauge.getValue(); - if( value instanceof Map){ - Map<? extends String, ?> map = (Map<? extends String, ?>) value; - this.instance.putAll(map); - } else { - this.instance.put("value", value); - } - return this; - } - - public Builder from(Histogram value) { - this.instance.put("count",value.getCount()); - Snapshot snapshot = value.getSnapshot(); - this.instance.put("min", snapshot.getMin()); - this.instance.put("max", snapshot.getMax()); - this.instance.put("mean", snapshot.getMean()); - this.instance.put("stddev", snapshot.getStdDev()); - this.instance.put("median", snapshot.getMedian()); - this.instance.put("75thPercentile", snapshot.get75thPercentile()); - this.instance.put("95thPercentile", snapshot.get95thPercentile()); - this.instance.put("98thPercentile", snapshot.get98thPercentile()); - this.instance.put("99thPercentile", snapshot.get99thPercentile()); - this.instance.put("999thPercentile", snapshot.get999thPercentile()); - return this; - } - - public Builder from(Meter value) { - this.instance.put("value",value.getCount()); - this.instance.put("15MinRate",value.getFifteenMinuteRate()); - this.instance.put("5MinRate",value.getFiveMinuteRate()); - this.instance.put("mean",value.getMeanRate()); - this.instance.put("1MinRate",value.getOneMinuteRate()); - return this; - } - - public Builder from(Timer value) { -// this.instance.put("type","timer"); - this.instance.put("value",value.getCount()); - this.instance.put("15MinRate",value.getFifteenMinuteRate()); - this.instance.put("5MinRate",value.getFiveMinuteRate()); - this.instance.put("mean",value.getMeanRate()); - this.instance.put("1MinRate",value.getOneMinuteRate()); - return this; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java deleted file mode 100644 index 42bf5d5..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.metric.reporter; - -import java.util.Map; -import java.util.Properties; -import java.util.SortedMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.eagle.alert.metric.entity.MetricEvent; -import org.apache.eagle.alert.utils.ByteUtils; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.codahale.metrics.ConsoleReporter; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricFilter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.ScheduledReporter; -import com.codahale.metrics.Timer; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; - -public class KafkaReporter extends ScheduledReporter { - private final static Logger LOG = LoggerFactory.getLogger(KafkaReporter.class); - private final String topic; - private final Properties properties; - private final Producer<byte[], String> producer; - private final Map<String, Object> additionalFields; - - protected KafkaReporter(MetricRegistry registry, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit, String topic, Properties config, Map<String, Object> additionalFields) { - super(registry, "kafka-reporter", filter, rateUnit, durationUnit); - this.topic = topic; - this.properties = new Properties(); - Preconditions.checkNotNull(topic,"topic should not be null"); -// properties.put("bootstrap.servers", brokerList); -// properties.put("metadata.broker.list", brokerList); - properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - properties.put("request.required.acks", "1"); - properties.put("key.deserializer","org.apache.kafka.common.serialization.ByteArraySerializer"); - properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); - if(config != null) { - LOG.info(config.toString()); - properties.putAll(config); - } - this.additionalFields = additionalFields; - this.producer = new KafkaProducer<>(properties); - LOG.info("Initialized kafka-reporter"); - } - - @SuppressWarnings("rawtypes") - @Override - public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) { - for(SortedMap.Entry<String, Gauge> entry:gauges.entrySet()){ - onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build()); - } - for(SortedMap.Entry<String, Counter> entry:counters.entrySet()){ - onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build()); - } - for(SortedMap.Entry<String, Histogram> entry:histograms.entrySet()){ - onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build()); - } - for(SortedMap.Entry<String, Meter> entry:meters.entrySet()){ - onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build()); - } - for(SortedMap.Entry<String, Timer> entry:timers.entrySet()){ - onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build()); - } - } - - private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private void onMetricEvent(MetricEvent event){ - try { - if(additionalFields!=null){ - event.putAll(additionalFields); - } - // TODO: Support configurable partition key - byte[] key = ByteUtils.intToBytes(event.hashCode()); - ProducerRecord<byte[],String> record = new ProducerRecord<>(topic, key, OBJECT_MAPPER.writeValueAsString(event)); - // TODO: Support configuration timeout - this.producer.send(record).get(5,TimeUnit.SECONDS); - } catch (JsonProcessingException e) { - LOG.error("Failed to serialize {} as json",event,e); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.error("Failed to produce message to topic {}",topic,e); - } - } - - @Override - public void stop() { - this.producer.close(); - super.stop(); - } - - @Override - public void close() { - this.producer.close(); - super.close(); - } - - public static Builder forRegistry(MetricRegistry registry){ - return new Builder(registry); - } - - public static class Builder{ - private final MetricRegistry registry; - private TimeUnit rateUnit; - private TimeUnit durationUnit; - private MetricFilter filter; - private String topic; - private Properties properties; - private Map<String, Object> additionalFields; - - private Builder(MetricRegistry registry) { - this.registry = registry; - this.rateUnit = TimeUnit.SECONDS; - this.durationUnit = TimeUnit.MILLISECONDS; - this.filter = MetricFilter.ALL; - } - - /** - * Convert rates to the given time unit. - * - * @param rateUnit a unit of time - * @return {@code this} - */ - public Builder convertRatesTo(TimeUnit rateUnit) { - this.rateUnit = rateUnit; - return this; - } - - /** - * Convert durations to the given time unit. - * - * @param durationUnit a unit of time - * @return {@code this} - */ - public Builder convertDurationsTo(TimeUnit durationUnit) { - this.durationUnit = durationUnit; - return this; - } - - /** - * Only report metrics which match the given filter. - * - * @param filter a {@link MetricFilter} - * @return {@code this} - */ - public Builder filter(MetricFilter filter) { - this.filter = filter; - return this; - } - - public Builder topic(String topic){ - this.topic = topic; - return this; - } - - public Builder config(Properties properties){ - this.properties = properties; - return this; - } - - /** - * Builds a {@link ConsoleReporter} with the given properties. - * - * @return a {@link ConsoleReporter} - */ - public KafkaReporter build() { - if(topic == null && properties!=null) topic = properties.getProperty("topic"); - return new KafkaReporter(registry,filter,rateUnit,durationUnit,topic,properties,additionalFields); - } - - @SuppressWarnings("serial") - public Builder config(Config config) { - this.config(new Properties(){{ - putAll(config.root().unwrapped()); - }}); - return this; - } - - public Builder addFields(Map<String, Object> tags) { - this.additionalFields = tags; - return this; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java deleted file mode 100644 index fd6cc41..0000000 --- a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java +++ /dev/null @@ -1,47 +0,0 @@ -package org.apache.eagle.alert.metric.sink; - -import java.util.concurrent.TimeUnit; - -import com.codahale.metrics.ConsoleReporter; -import com.codahale.metrics.MetricRegistry; -import com.typesafe.config.Config; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public class ConsoleSink implements MetricSink { - private ConsoleReporter reporter; - @Override - public void prepare(Config config, MetricRegistry registry) { - reporter = ConsoleReporter.forRegistry(registry).build(); - } - - @Override - public void start(long period,TimeUnit unit) { - reporter.start(period, unit); - } - - @Override - public void stop() { - reporter.stop(); - reporter.close(); - } - - @Override - public void report() { - reporter.report(); - } -} \ No newline at end of file