Repository: eagle Updated Branches: refs/heads/master a71a36bf4 -> 7e41ed0b9
[MINOR] Fix metadata updating bug by impl equals/hashCode Fix metadata updating bug by impl equals/hashCode Author: Hao Chen <[email protected]> Closes #868 from haoch/FixMetadataUpdate. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/7e41ed0b Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/7e41ed0b Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/7e41ed0b Branch: refs/heads/master Commit: 7e41ed0b9d9c6bcbd5fef156ae5ca1cdc7e16c28 Parents: a71a36b Author: Hao Chen <[email protected]> Authored: Mon Mar 13 17:15:39 2017 +0800 Committer: Hao Chen <[email protected]> Committed: Mon Mar 13 17:15:39 2017 +0800 ---------------------------------------------------------------------- .../app/messaging/KafkaStreamSinkConfig.java | 52 +++++++++++++++- .../app/messaging/KafkaStreamSourceConfig.java | 65 ++++++++++++++++++++ .../impl/StreamMetadataUpdateServiceImpl.java | 9 +-- .../apache/eagle/metadata/model/StreamDesc.java | 35 ++++++++++- 4 files changed, 154 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/7e41ed0b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java index 4a72709..bdc4f53 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java @@ -16,8 +16,11 @@ */ package org.apache.eagle.app.messaging; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.eagle.metadata.model.StreamSinkConfig; +import java.util.Objects; + /** * FIXME Rename to KafkaStreamMessagingConfig. */ @@ -111,5 +114,52 @@ public class KafkaStreamSinkConfig implements StreamSinkConfig { return KafkaStreamSinkConfig.class; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KafkaStreamSinkConfig)) { + return false; + } + + KafkaStreamSinkConfig config = (KafkaStreamSinkConfig) o; + + if (!getTopicId().equals(config.getTopicId())) { + return false; + } + if (getBrokerList() != null ? !getBrokerList().equals(config.getBrokerList()) : config.getBrokerList() != null) { + return false; + } + if (getSerializerClass() != null ? !getSerializerClass().equals(config.getSerializerClass()) : config.getSerializerClass() != null) { + return false; + } + if (getKeySerializerClass() != null ? !getKeySerializerClass().equals(config.getKeySerializerClass()) : config.getKeySerializerClass() != null) { + return false; + } + if (getNumBatchMessages() != null ? !getNumBatchMessages().equals(config.getNumBatchMessages()) : config.getNumBatchMessages() != null) { + return false; + } + if (getMaxQueueBufferMs() != null ? !getMaxQueueBufferMs().equals(config.getMaxQueueBufferMs()) : config.getMaxQueueBufferMs() != null) { + return false; + } + if (getProducerType() != null ? !getProducerType().equals(config.getProducerType()) : config.getProducerType() != null) { + return false; + } + return getRequestRequiredAcks() != null ? getRequestRequiredAcks().equals(config.getRequestRequiredAcks()) : config.getRequestRequiredAcks() == null; -} + } + + @Override + public int hashCode() { + int result = getTopicId().hashCode(); + result = 31 * result + (getBrokerList() != null ? getBrokerList().hashCode() : 0); + result = 31 * result + (getSerializerClass() != null ? getSerializerClass().hashCode() : 0); + result = 31 * result + (getKeySerializerClass() != null ? getKeySerializerClass().hashCode() : 0); + result = 31 * result + (getNumBatchMessages() != null ? getNumBatchMessages().hashCode() : 0); + result = 31 * result + (getMaxQueueBufferMs() != null ? getMaxQueueBufferMs().hashCode() : 0); + result = 31 * result + (getProducerType() != null ? getProducerType().hashCode() : 0); + result = 31 * result + (getRequestRequiredAcks() != null ? getRequestRequiredAcks().hashCode() : 0); + return result; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/7e41ed0b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java index e6fdb83..d0a91da 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java @@ -148,4 +148,69 @@ public class KafkaStreamSourceConfig implements StreamSourceConfig { public void setTopicId(String topicId) { this.topicId = topicId; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KafkaStreamSourceConfig)) { + return false; + } + + KafkaStreamSourceConfig that = (KafkaStreamSourceConfig) o; + + if (getFetchSize() != that.getFetchSize()) { + return false; + } + if (getTransactionStateUpdateMS() != that.getTransactionStateUpdateMS()) { + return false; + } + if (getStartOffsetTime() != that.getStartOffsetTime()) { + return false; + } + if (isForceFromStart() != that.isForceFromStart()) { + return false; + } + if (getTopicId() != null ? !getTopicId().equals(that.getTopicId()) : that.getTopicId() != null) { + return false; + } + if (getBrokerZkQuorum() != null ? !getBrokerZkQuorum().equals(that.getBrokerZkQuorum()) : that.getBrokerZkQuorum() != null) { + return false; + } + if (getBrokerZkBasePath() != null ? !getBrokerZkBasePath().equals(that.getBrokerZkBasePath()) : that.getBrokerZkBasePath() != null) { + return false; + } + if (getTransactionZkServers() != null ? !getTransactionZkServers().equals(that.getTransactionZkServers()) : that.getTransactionZkServers() != null) { + return false; + } + if (getTransactionZKRoot() != null ? !getTransactionZKRoot().equals(that.getTransactionZKRoot()) : that.getTransactionZKRoot() != null) { + return false; + } + if (getConsumerGroupId() != null ? !getConsumerGroupId().equals(that.getConsumerGroupId()) : that.getConsumerGroupId() != null) { + return false; + } + if (getBrokerZkPath() != null ? !getBrokerZkPath().equals(that.getBrokerZkPath()) : that.getBrokerZkPath() != null) { + return false; + } + return getSchemaClass() != null ? getSchemaClass().equals(that.getSchemaClass()) : that.getSchemaClass() == null; + + } + + @Override + public int hashCode() { + int result = getTopicId() != null ? getTopicId().hashCode() : 0; + result = 31 * result + (getBrokerZkQuorum() != null ? getBrokerZkQuorum().hashCode() : 0); + result = 31 * result + (getBrokerZkBasePath() != null ? getBrokerZkBasePath().hashCode() : 0); + result = 31 * result + (getTransactionZkServers() != null ? getTransactionZkServers().hashCode() : 0); + result = 31 * result + getFetchSize(); + result = 31 * result + (getTransactionZKRoot() != null ? getTransactionZKRoot().hashCode() : 0); + result = 31 * result + (getConsumerGroupId() != null ? getConsumerGroupId().hashCode() : 0); + result = 31 * result + (getBrokerZkPath() != null ? getBrokerZkPath().hashCode() : 0); + result = 31 * result + (int) (getTransactionStateUpdateMS() ^ (getTransactionStateUpdateMS() >>> 32)); + result = 31 * result + getStartOffsetTime(); + result = 31 * result + (isForceFromStart() ? 1 : 0); + result = 31 * result + (getSchemaClass() != null ? getSchemaClass().hashCode() : 0); + return result; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/7e41ed0b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java index 10bad33..274f0da 100644 --- a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java @@ -76,17 +76,18 @@ class StreamMetadataUpdateServiceImpl implements StreamMetadataUpdateService { List<StreamDesc> streamDescList = appEntity.getStreams(); if (streamDescList != null && streamDescList.size() > 0) { for (StreamDesc streamDesc : streamDescList) { + total++; latestStreamIdDescMap.put(streamDesc.getStreamId(), streamDesc); - if (streamIdDescMap.containsKey(streamDesc.getStreamId()) && !Objects.equals(streamDesc, streamIdDescMap.get(streamDesc.getStreamId()))) { - this.listener.onStreamChanged(streamDesc); + if (streamIdDescMap.containsKey(streamDesc.getStreamId()) + && !streamDesc.equals(streamIdDescMap.get(streamDesc.getStreamId()))) { changed++; + this.listener.onStreamChanged(streamDesc); } else if (!streamIdDescMap.containsKey(streamDesc.getStreamId())) { added++; this.listener.onStreamAdded(streamDesc); } } } - total++; } for (String streamId : streamIdDescMap.keySet()) { @@ -95,12 +96,12 @@ class StreamMetadataUpdateServiceImpl implements StreamMetadataUpdateService { this.listener.onStreamRemoved(streamIdDescMap.get(streamId)); } } - this.streamIdDescMap = latestStreamIdDescMap; if (added > 0 || changed > 0 || removed > 0) { LOGGER.info("Loaded stream metadata: total = {}, added = {}, changed = {}, removed = {}", total, added, changed, removed); } else { LOGGER.debug("Loaded stream metadata: total = {}, added = {}, changed = {}, removed = {}", total, added, changed, removed); } + this.streamIdDescMap = latestStreamIdDescMap; } } http://git-wip-us.apache.org/repos/asf/eagle/blob/7e41ed0b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java index a183351..a5d5ca8 100644 --- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java +++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java @@ -19,8 +19,6 @@ package org.apache.eagle.metadata.model; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.metadata.utils.StreamIdConversions; -import javax.xml.transform.stream.StreamSource; - public class StreamDesc { private String streamId; private StreamDefinition schema; @@ -58,4 +56,37 @@ public class StreamDesc { public void setSourceConfig(StreamSourceConfig sourceConfig) { this.sourceConfig = sourceConfig; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof StreamDesc)) { + return false; + } + + StreamDesc that = (StreamDesc) o; + + if (!getStreamId().equals(that.getStreamId())) { + return false; + } + if (getSchema() != null ? !getSchema().equals(that.getSchema()) : that.getSchema() != null) { + return false; + } + if (getSinkConfig() != null ? !getSinkConfig().equals(that.getSinkConfig()) : that.getSinkConfig() != null) { + return false; + } + return getSourceConfig() != null ? getSourceConfig().equals(that.getSourceConfig()) : that.getSourceConfig() == null; + + } + + @Override + public int hashCode() { + int result = getStreamId().hashCode(); + result = 31 * result + (getSchema() != null ? getSchema().hashCode() : 0); + result = 31 * result + (getSinkConfig() != null ? getSinkConfig().hashCode() : 0); + result = 31 * result + (getSourceConfig() != null ? getSourceConfig().hashCode() : 0); + return result; + } } \ No newline at end of file
