http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java index a4901ae..d25ebdd 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java @@ -1,23 +1,18 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ /** - * + * */ + package org.apache.storm.eventhubs.spout; import java.io.Serializable; @@ -26,5 +21,5 @@ import java.io.Serializable; * An abstract factory to generate EventHubReceiver */ public interface IEventHubReceiverFactory extends Serializable { - IEventHubReceiver create(EventHubSpoutConfig config, String partitionId); + IEventHubReceiver create(EventHubSpoutConfig config, String partitionId); }
http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java index e99f20a..d83c0cb 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java @@ -15,13 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import java.util.List; public interface IPartitionCoordinator { - List<IPartitionManager> getMyPartitionManagers(); + List<IPartitionManager> getMyPartitionManagers(); - IPartitionManager getPartitionManager(String partitionId); + IPartitionManager getPartitionManager(String partitionId); } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java index d391edd..345fd48 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java @@ -15,23 +15,24 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import java.util.Map; public interface IPartitionManager { - void open() throws Exception; + void open() throws Exception; + + void close(); - void close(); + EventDataWrap receive(); - EventDataWrap receive(); + void checkpoint(); - void checkpoint(); + void ack(String offset); - void ack(String offset); + void fail(String offset); - void fail(String offset); - - Map<String, Object> getMetricsData(); + Map<String, Object> getMetricsData(); } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java index dc136eb..c3ce2ec 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java @@ -1,23 +1,18 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ /** - * + * */ + package org.apache.storm.eventhubs.spout; import java.io.Serializable; @@ -26,8 +21,8 @@ import java.io.Serializable; * An interface of factory method to create IPartitionManager */ public interface IPartitionManagerFactory extends Serializable { - IPartitionManager create(EventHubSpoutConfig spoutConfig, - String partitionId, - IStateStore stateStore, - IEventHubReceiver receiver); + IPartitionManager create(EventHubSpoutConfig spoutConfig, + String partitionId, + IStateStore stateStore, + IEventHubReceiver receiver); } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java index 03c7ae8..311ceb7 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java @@ -15,17 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import java.io.Serializable; public interface IStateStore extends Serializable { - public void open(); + public void open(); - public void close(); + public void close(); - public void saveData(String path, String data); + public void saveData(String path, String data); - public String readData(String path); + public String readData(String path); } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java index 59d5c71..0bb1f10 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java @@ -15,42 +15,43 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; public class MessageId { - private final String partitionId; - private final String offset; - private final long sequenceNumber; - - public MessageId( - String partitionId, - String offset, - long sequenceNumber) { - this.partitionId = partitionId; - this.offset = offset; - this.sequenceNumber = sequenceNumber; - } - - public static MessageId create(String partitionId, String offset, long sequenceNumber) { - return new MessageId(partitionId, offset, sequenceNumber); - } - - public String getPartitionId() { - return this.partitionId; - } - - public String getOffset() { - return this.offset; - } - - public Long getSequenceNumber() { - return this.sequenceNumber; - } - - @Override - public String toString() { - return String.format("PartitionId: %s, Offset: %s, SequenceNumber: %s", - this.partitionId, this.offset, this.sequenceNumber); - } + private final String partitionId; + private final String offset; + private final long sequenceNumber; + + public MessageId( + String partitionId, + String offset, + long sequenceNumber) { + this.partitionId = partitionId; + this.offset = offset; + this.sequenceNumber = sequenceNumber; + } + + public static MessageId create(String partitionId, String offset, long sequenceNumber) { + return new MessageId(partitionId, offset, sequenceNumber); + } + + public String getPartitionId() { + return this.partitionId; + } + + public String getOffset() { + return this.offset; + } + + public Long getSequenceNumber() { + return this.sequenceNumber; + } + + @Override + public String toString() { + return String.format("PartitionId: %s, Offset: %s, SequenceNumber: %s", + this.partitionId, this.offset, this.sequenceNumber); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java index 20e021a..c5c5084 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java @@ -15,87 +15,87 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ -package org.apache.storm.eventhubs.spout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +package org.apache.storm.eventhubs.spout; import java.util.LinkedHashMap; import java.util.Map; import java.util.TreeSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PartitionManager extends SimplePartitionManager { - private static final Logger logger = LoggerFactory.getLogger(PartitionManager.class); - private final int ehReceiveTimeoutMs = 5000; + private static final Logger logger = LoggerFactory.getLogger(PartitionManager.class); + private final int ehReceiveTimeoutMs = 5000; - //all sent events are stored in pending - private final Map<String, EventDataWrap> pending; - //all failed events are put in toResend, which is sorted by event's offset - private final TreeSet<EventDataWrap> toResend; + //all sent events are stored in pending + private final Map<String, EventDataWrap> pending; + //all failed events are put in toResend, which is sorted by event's offset + private final TreeSet<EventDataWrap> toResend; - public PartitionManager( - EventHubSpoutConfig spoutConfig, - String partitionId, - IStateStore stateStore, - IEventHubReceiver receiver) { + public PartitionManager( + EventHubSpoutConfig spoutConfig, + String partitionId, + IStateStore stateStore, + IEventHubReceiver receiver) { - super(spoutConfig, partitionId, stateStore, receiver); - - this.pending = new LinkedHashMap<String, EventDataWrap>(); - this.toResend = new TreeSet<EventDataWrap>(); - } + super(spoutConfig, partitionId, stateStore, receiver); - @Override - public EventDataWrap receive() { - if(pending.size() >= config.getMaxPendingMsgsPerPartition()) { - return null; + this.pending = new LinkedHashMap<String, EventDataWrap>(); + this.toResend = new TreeSet<EventDataWrap>(); } - EventDataWrap eventDatawrap; - if (toResend.isEmpty()) { - eventDatawrap = receiver.receive(); - } else { - eventDatawrap = toResend.pollFirst(); - } + @Override + public EventDataWrap receive() { + if (pending.size() >= config.getMaxPendingMsgsPerPartition()) { + return null; + } - if (eventDatawrap != null) { - lastOffset = eventDatawrap.getMessageId().getOffset(); - pending.put(lastOffset, eventDatawrap); - } + EventDataWrap eventDatawrap; + if (toResend.isEmpty()) { + eventDatawrap = receiver.receive(); + } else { + eventDatawrap = toResend.pollFirst(); + } - return eventDatawrap; - } + if (eventDatawrap != null) { + lastOffset = eventDatawrap.getMessageId().getOffset(); + pending.put(lastOffset, eventDatawrap); + } - @Override - public void ack(String offset) { - pending.remove(offset); - } + return eventDatawrap; + } - @Override - public void fail(String offset) { - logger.warn("fail on " + offset); - EventDataWrap eventDataWrap = pending.remove(offset); - toResend.add(eventDataWrap); - } - - @Override - protected String getCompletedOffset() { - String offset = null; - - if(pending.size() > 0) { - //find the smallest offset in pending list - offset = pending.keySet().iterator().next(); + @Override + public void ack(String offset) { + pending.remove(offset); } - if(toResend.size() > 0) { - //find the smallest offset in toResend list - String offset2 = toResend.first().getMessageId().getOffset(); - if(offset == null || offset2.compareTo(offset) < 0) { - offset = offset2; - } + + @Override + public void fail(String offset) { + logger.warn("fail on " + offset); + EventDataWrap eventDataWrap = pending.remove(offset); + toResend.add(eventDataWrap); } - if(offset == null) { - offset = lastOffset; + + @Override + protected String getCompletedOffset() { + String offset = null; + + if (pending.size() > 0) { + //find the smallest offset in pending list + offset = pending.keySet().iterator().next(); + } + if (toResend.size() > 0) { + //find the smallest offset in toResend list + String offset2 = toResend.first().getMessageId().getOffset(); + if (offset == null || offset2.compareTo(offset) < 0) { + offset = offset2; + } + } + if (offset == null) { + offset = lastOffset; + } + return offset; } - return offset; - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java index 25c4261..436aabe 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java @@ -15,119 +15,119 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; +import java.time.Instant; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Instant; -import java.util.Map; /** * A simple partition manager that does not re-send failed messages */ public class SimplePartitionManager implements IPartitionManager { - private static final Logger logger = LoggerFactory.getLogger(SimplePartitionManager.class); - private static final String statePathPrefix = "/eventhubspout"; - - protected final IEventHubReceiver receiver; - protected String lastOffset = "-1"; - protected String committedOffset = "-1"; - - protected final EventHubSpoutConfig config; - private final String partitionId; - private final IStateStore stateStore; - private final String statePath; - - public SimplePartitionManager( - EventHubSpoutConfig spoutConfig, - String partitionId, - IStateStore stateStore, - IEventHubReceiver receiver) { - this.receiver = receiver; - this.config = spoutConfig; - this.partitionId = partitionId; - this.statePath = this.getPartitionStatePath(); - this.stateStore = stateStore; - } - - @Override - public void open() throws Exception { - - //read from state store, if not found, use startingOffset - String offset = stateStore.readData(statePath); - logger.info("read offset from state store: " + offset); - if(offset == null) { - offset = FieldConstants.DefaultStartingOffset; + private static final Logger logger = LoggerFactory.getLogger(SimplePartitionManager.class); + private static final String statePathPrefix = "/eventhubspout"; + + protected final IEventHubReceiver receiver; + protected final EventHubSpoutConfig config; + private final String partitionId; + private final IStateStore stateStore; + private final String statePath; + protected String lastOffset = "-1"; + protected String committedOffset = "-1"; + + public SimplePartitionManager( + EventHubSpoutConfig spoutConfig, + String partitionId, + IStateStore stateStore, + IEventHubReceiver receiver) { + this.receiver = receiver; + this.config = spoutConfig; + this.partitionId = partitionId; + this.statePath = this.getPartitionStatePath(); + this.stateStore = stateStore; + } + + @Override + public void open() throws Exception { + + //read from state store, if not found, use startingOffset + String offset = stateStore.readData(statePath); + logger.info("read offset from state store: " + offset); + if (offset == null) { + offset = FieldConstants.DefaultStartingOffset; + } + IEventFilter filter; + if (offset.equals(FieldConstants.DefaultStartingOffset) + && config.getEnqueueTimeFilter() != 0) { + filter = new EventHubFilter(Instant.ofEpochMilli(config.getEnqueueTimeFilter())); + } else { + filter = new EventHubFilter(offset); + } + + receiver.open(filter); + } + + @Override + public void close() { + this.receiver.close(); + this.checkpoint(); } - IEventFilter filter; - if (offset.equals(FieldConstants.DefaultStartingOffset) - && config.getEnqueueTimeFilter() != 0) { - filter = new EventHubFilter(Instant.ofEpochMilli(config.getEnqueueTimeFilter())); + + @Override + public void checkpoint() { + String completedOffset = getCompletedOffset(); + if (!committedOffset.equals(completedOffset)) { + logger.info("saving state " + completedOffset); + stateStore.saveData(statePath, completedOffset); + committedOffset = completedOffset; + } + } + + protected String getCompletedOffset() { + return lastOffset; } - else{ - filter = new EventHubFilter(offset); + + @Override + public EventDataWrap receive() { + EventDataWrap eventDatawrap = receiver.receive(); + if (eventDatawrap != null) { + lastOffset = eventDatawrap.getEventData().getSystemProperties().getOffset(); + } + return eventDatawrap; } - receiver.open(filter); - } - - @Override - public void close() { - this.receiver.close(); - this.checkpoint(); - } - - @Override - public void checkpoint() { - String completedOffset = getCompletedOffset(); - if(!committedOffset.equals(completedOffset)) { - logger.info("saving state " + completedOffset); - stateStore.saveData(statePath, completedOffset); - committedOffset = completedOffset; + @Override + public void ack(String offset) { + //do nothing } - } - - protected String getCompletedOffset() { - return lastOffset; - } - - @Override - public EventDataWrap receive() { - EventDataWrap eventDatawrap = receiver.receive(); - if (eventDatawrap != null) { - lastOffset = eventDatawrap.getEventData().getSystemProperties().getOffset(); + + @Override + public void fail(String offset) { + logger.warn("fail on " + offset); + //do nothing + } + + private String getPartitionStatePath() { + + // Partition state path = + // "/{prefix}/{topologyName}/{namespace}/{entityPath}/partitions/{partitionId}/state"; + String namespace = config.getNamespace(); + String entityPath = config.getEntityPath(); + String topologyName = config.getTopologyName(); + + String partitionStatePath = + statePathPrefix + "/" + topologyName + "/" + namespace + "/" + entityPath + "/partitions/" + this.partitionId; + + logger.info("partition state path: " + partitionStatePath); + + return partitionStatePath; + } + + @Override + public Map<String, Object> getMetricsData() { + return receiver.getMetricsData(); } - return eventDatawrap; - } - - @Override - public void ack(String offset) { - //do nothing - } - - @Override - public void fail(String offset) { - logger.warn("fail on " + offset); - //do nothing - } - - private String getPartitionStatePath() { - - // Partition state path = - // "/{prefix}/{topologyName}/{namespace}/{entityPath}/partitions/{partitionId}/state"; - String namespace = config.getNamespace(); - String entityPath = config.getEntityPath(); - String topologyName = config.getTopologyName(); - - String partitionStatePath = statePathPrefix + "/" + topologyName + "/" + namespace + "/" + entityPath + "/partitions/" + this.partitionId; - - logger.info("partition state path: " + partitionStatePath); - - return partitionStatePath; - } - - @Override - public Map<String, Object> getMetricsData() { - return receiver.getMetricsData(); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java index abbecac..54c0f7d 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java @@ -15,70 +15,70 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StaticPartitionCoordinator implements IPartitionCoordinator { - private static final Logger logger = LoggerFactory.getLogger(StaticPartitionCoordinator.class); + private static final Logger logger = LoggerFactory.getLogger(StaticPartitionCoordinator.class); - protected final EventHubSpoutConfig config; - protected final int taskIndex; - protected final int totalTasks; - protected final List<IPartitionManager> partitionManagers; - protected final Map<String, IPartitionManager> partitionManagerMap; - protected final IStateStore stateStore; + protected final EventHubSpoutConfig config; + protected final int taskIndex; + protected final int totalTasks; + protected final List<IPartitionManager> partitionManagers; + protected final Map<String, IPartitionManager> partitionManagerMap; + protected final IStateStore stateStore; - public StaticPartitionCoordinator( - EventHubSpoutConfig spoutConfig, - int taskIndex, - int totalTasks, - IStateStore stateStore, - IPartitionManagerFactory pmFactory, - IEventHubReceiverFactory recvFactory) { + public StaticPartitionCoordinator( + EventHubSpoutConfig spoutConfig, + int taskIndex, + int totalTasks, + IStateStore stateStore, + IPartitionManagerFactory pmFactory, + IEventHubReceiverFactory recvFactory) { - this.config = spoutConfig; - this.taskIndex = taskIndex; - this.totalTasks = totalTasks; - this.stateStore = stateStore; - List<String> partitionIds = calculateParititionIdsToOwn(); - partitionManagerMap = new HashMap<String, IPartitionManager>(); - partitionManagers = new ArrayList<IPartitionManager>(); - - for (String partitionId : partitionIds) { - IEventHubReceiver receiver = recvFactory.create(config, partitionId); - IPartitionManager partitionManager = pmFactory.create( - config, partitionId, stateStore, receiver); - partitionManagerMap.put(partitionId, partitionManager); - partitionManagers.add(partitionManager); - } - } + this.config = spoutConfig; + this.taskIndex = taskIndex; + this.totalTasks = totalTasks; + this.stateStore = stateStore; + List<String> partitionIds = calculateParititionIdsToOwn(); + partitionManagerMap = new HashMap<String, IPartitionManager>(); + partitionManagers = new ArrayList<IPartitionManager>(); - @Override - public List<IPartitionManager> getMyPartitionManagers() { - return partitionManagers; - } + for (String partitionId : partitionIds) { + IEventHubReceiver receiver = recvFactory.create(config, partitionId); + IPartitionManager partitionManager = pmFactory.create( + config, partitionId, stateStore, receiver); + partitionManagerMap.put(partitionId, partitionManager); + partitionManagers.add(partitionManager); + } + } - @Override - public IPartitionManager getPartitionManager(String partitionId) { - return partitionManagerMap.get(partitionId); - } + @Override + public List<IPartitionManager> getMyPartitionManagers() { + return partitionManagers; + } - protected List<String> calculateParititionIdsToOwn() { - List<String> taskPartitions = new ArrayList<String>(); - for (int i = this.taskIndex; i < config.getPartitionCount(); i += this.totalTasks) { - taskPartitions.add(Integer.toString(i)); - logger.info(String.format("taskIndex %d owns partitionId %d.", this.taskIndex, i)); + @Override + public IPartitionManager getPartitionManager(String partitionId) { + return partitionManagerMap.get(partitionId); } - return taskPartitions; - } + protected List<String> calculateParititionIdsToOwn() { + List<String> taskPartitions = new ArrayList<String>(); + for (int i = this.taskIndex; i < config.getPartitionCount(); i += this.totalTasks) { + taskPartitions.add(Integer.toString(i)); + logger.info(String.format("taskIndex %d owns partitionId %d.", this.taskIndex, i)); + } + + return taskPartitions; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java index a2024bd..5b6a2b9 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java @@ -15,60 +15,58 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import com.microsoft.azure.eventhubs.EventData; +import java.util.ArrayList; +import java.util.List; import org.apache.storm.tuple.Fields; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; - /** - * An Event Data Scheme which deserializes message payload into the Strings. - * No encoding is assumed. The receiver will need to handle parsing of the - * string data in appropriate encoding. + * An Event Data Scheme which deserializes message payload into the Strings. No encoding is assumed. The receiver will need to handle + * parsing of the string data in appropriate encoding. + * + * Note: Unlike other schemes provided, this scheme does not include any metadata. * - * Note: Unlike other schemes provided, this scheme does not include any - * metadata. - * - * For metadata please refer to {@link BinaryEventDataScheme}, {@link EventDataScheme} + * For metadata please refer to {@link BinaryEventDataScheme}, {@link EventDataScheme} */ public class StringEventDataScheme implements IEventDataScheme { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory.getLogger(StringEventDataScheme.class); + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(StringEventDataScheme.class); - @Override - public List<Object> deserialize(EventData eventData) { - final List<Object> fieldContents = new ArrayList<Object>(); - String messageData = ""; - if (eventData.getBytes()!=null) { - messageData = new String(eventData.getBytes()); - } - /*Will only serialize AMQPValue type*/ - else if (eventData.getObject()!=null) { - try { - if (!(eventData.getObject() instanceof List)) { - messageData = eventData.getObject().toString(); - } else { - throw new RuntimeException("Cannot serialize the given AMQP type."); + @Override + public List<Object> deserialize(EventData eventData) { + final List<Object> fieldContents = new ArrayList<Object>(); + String messageData = ""; + if (eventData.getBytes() != null) { + messageData = new String(eventData.getBytes()); } - } catch (RuntimeException e){ - logger.error("Failed to serialize EventData payload class" - + eventData.getObject().getClass()); - logger.error("Exception encountered while serializing EventData payload is" - + e.toString()); - throw e; - } + /*Will only serialize AMQPValue type*/ + else if (eventData.getObject() != null) { + try { + if (!(eventData.getObject() instanceof List)) { + messageData = eventData.getObject().toString(); + } else { + throw new RuntimeException("Cannot serialize the given AMQP type."); + } + } catch (RuntimeException e) { + logger.error("Failed to serialize EventData payload class" + + eventData.getObject().getClass()); + logger.error("Exception encountered while serializing EventData payload is" + + e.toString()); + throw e; + } + } + fieldContents.add(messageData); + return fieldContents; } - fieldContents.add(messageData); - return fieldContents; - } - @Override - public Fields getOutputFields() { - return new Fields(FieldConstants.Message); - } + @Override + public Fields getOutputFields() { + return new Fields(FieldConstants.Message); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java index 063aa4d..4e2ece9 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import org.apache.curator.RetryPolicy; @@ -25,71 +26,71 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZookeeperStateStore implements IStateStore { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory.getLogger(ZookeeperStateStore.class); + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(ZookeeperStateStore.class); - private final String zookeeperConnectionString; - private final CuratorFramework curatorFramework; - - public ZookeeperStateStore(String zookeeperConnectionString) { - this(zookeeperConnectionString, 3, 100); - } + private final String zookeeperConnectionString; + private final CuratorFramework curatorFramework; - public ZookeeperStateStore(String connectionString, int retries, int retryInterval) { - if (connectionString == null) { - zookeeperConnectionString = "localhost:2181"; - } else { - zookeeperConnectionString = connectionString; + public ZookeeperStateStore(String zookeeperConnectionString) { + this(zookeeperConnectionString, 3, 100); } - RetryPolicy retryPolicy = new RetryNTimes(retries, retryInterval); - curatorFramework = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); - } + public ZookeeperStateStore(String connectionString, int retries, int retryInterval) { + if (connectionString == null) { + zookeeperConnectionString = "localhost:2181"; + } else { + zookeeperConnectionString = connectionString; + } - @Override - public void open() { - curatorFramework.start(); - } + RetryPolicy retryPolicy = new RetryNTimes(retries, retryInterval); + curatorFramework = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); + } - @Override - public void close() { - curatorFramework.close(); - } + @Override + public void open() { + curatorFramework.start(); + } + + @Override + public void close() { + curatorFramework.close(); + } - @Override - public void saveData(String statePath, String data) { - data = data == null ? "" : data; - byte[] bytes = data.getBytes(); + @Override + public void saveData(String statePath, String data) { + data = data == null ? "" : data; + byte[] bytes = data.getBytes(); - try { - if (curatorFramework.checkExists().forPath(statePath) == null) { - curatorFramework.create().creatingParentsIfNeeded().forPath(statePath, bytes); - } else { - curatorFramework.setData().forPath(statePath, bytes); - } + try { + if (curatorFramework.checkExists().forPath(statePath) == null) { + curatorFramework.create().creatingParentsIfNeeded().forPath(statePath, bytes); + } else { + curatorFramework.setData().forPath(statePath, bytes); + } - logger.info(String.format("data was saved. path: %s, data: %s.", statePath, data)); - } catch (Exception e) { - throw new RuntimeException(e); + logger.info(String.format("data was saved. path: %s, data: %s.", statePath, data)); + } catch (Exception e) { + throw new RuntimeException(e); + } } - } - @Override - public String readData(String statePath) { - try { - if (curatorFramework.checkExists().forPath(statePath) == null) { - // do we want to throw an exception if path doesn't exist?? - return null; - } else { - byte[] bytes = curatorFramework.getData().forPath(statePath); - String data = new String(bytes); + @Override + public String readData(String statePath) { + try { + if (curatorFramework.checkExists().forPath(statePath) == null) { + // do we want to throw an exception if path doesn't exist?? + return null; + } else { + byte[] bytes = curatorFramework.getData().forPath(statePath); + String data = new String(bytes); - logger.info(String.format("data was retrieved. path: %s, data: %s.", statePath, data)); + logger.info(String.format("data was retrieved. path: %s, data: %s.", statePath, data)); - return data; - } - } catch (Exception e) { - throw new RuntimeException(e); + return data; + } + } catch (Exception e) { + throw new RuntimeException(e); + } } - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java index 253b317..b87c81b 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java @@ -15,46 +15,45 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.trident; +import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; +import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; +import org.apache.storm.trident.spout.IPartitionedTridentSpout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; -import org.apache.storm.trident.spout.IPartitionedTridentSpout; +public class Coordinator implements IPartitionedTridentSpout.Coordinator<Partitions>, + IOpaquePartitionedTridentSpout.Coordinator<Partitions> { + private static final Logger logger = LoggerFactory.getLogger(Coordinator.class); + private final EventHubSpoutConfig spoutConfig; + Partitions partitions; -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; + public Coordinator(EventHubSpoutConfig spoutConfig) { + this.spoutConfig = spoutConfig; + } -public class Coordinator implements IPartitionedTridentSpout.Coordinator<Partitions>, - IOpaquePartitionedTridentSpout.Coordinator<Partitions> { - private static final Logger logger = LoggerFactory.getLogger(Coordinator.class); - private final EventHubSpoutConfig spoutConfig; - Partitions partitions; - - public Coordinator(EventHubSpoutConfig spoutConfig) { - this.spoutConfig = spoutConfig; - } - - @Override - public void close() { - } - - @Override - public Partitions getPartitionsForBatch() { - if(partitions != null) { - return partitions; + @Override + public void close() { } - - partitions = new Partitions(); - for(int i=0; i<spoutConfig.getPartitionCount(); ++i) { - partitions.addPartition(new Partition(spoutConfig, Integer.toString(i))); + + @Override + public Partitions getPartitionsForBatch() { + if (partitions != null) { + return partitions; + } + + partitions = new Partitions(); + for (int i = 0; i < spoutConfig.getPartitionCount(); ++i) { + partitions.addPartition(new Partition(spoutConfig, Integer.toString(i))); + } + logger.info("created partitions, size=" + spoutConfig.getPartitionCount()); + return partitions; + } + + @Override + public boolean isReady(long txid) { + return true; } - logger.info("created partitions, size=" + spoutConfig.getPartitionCount()); - return partitions; - } - - @Override - public boolean isReady(long txid) { - return true; - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java index d1e8b9e..922e334 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java @@ -15,21 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ -package org.apache.storm.eventhubs.trident; -import org.apache.storm.eventhubs.spout.EventDataWrap; +package org.apache.storm.eventhubs.trident; import java.util.List; +import org.apache.storm.eventhubs.spout.EventDataWrap; public interface ITridentPartitionManager { - boolean open(String offset); - void close(); - - /** - * receive a batch of messages from EvenHub up to "count" messages - * @param offset the starting offset - * @param count max number of messages in this batch - * @return list of EventData, if failed to receive, return empty list - */ - public List<EventDataWrap> receiveBatch(String offset, int count); + boolean open(String offset); + + void close(); + + /** + * receive a batch of messages from EvenHub up to "count" messages + * + * @param offset the starting offset + * @param count max number of messages in this batch + * @return list of EventData, if failed to receive, return empty list + */ + public List<EventDataWrap> receiveBatch(String offset, int count); } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java index 701bd46..15a83cc 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.trident; import java.io.Serializable; - import org.apache.storm.eventhubs.spout.IEventHubReceiver; public interface ITridentPartitionManagerFactory extends Serializable { - ITridentPartitionManager create(IEventHubReceiver receiver); + ITridentPartitionManager create(IEventHubReceiver receiver); } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java index 0da421c..4020b21 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java @@ -15,54 +15,55 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.trident; +import java.util.List; +import java.util.Map; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; import org.apache.storm.trident.topology.TransactionAttempt; -import java.util.List; -import java.util.Map; - /** * A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout */ public class OpaqueTridentEventHubEmitter implements IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> { - private final TransactionalTridentEventHubEmitter transactionalEmitter; - public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) { - transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig); - } - - public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig, - int batchSize, - ITridentPartitionManagerFactory pmFactory, - IEventHubReceiverFactory recvFactory) { - transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig, - batchSize, - pmFactory, - recvFactory); - } + private final TransactionalTridentEventHubEmitter transactionalEmitter; + + public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) { + transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig); + } + + public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig, + int batchSize, + ITridentPartitionManagerFactory pmFactory, + IEventHubReceiverFactory recvFactory) { + transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig, + batchSize, + pmFactory, + recvFactory); + } - @Override - public void close() { - transactionalEmitter.close(); - } + @Override + public void close() { + transactionalEmitter.close(); + } - @Override - public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, - Partition partition, Map meta) { - return transactionalEmitter.emitPartitionBatchNew(attempt, collector, partition, meta); - } + @Override + public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, + Partition partition, Map meta) { + return transactionalEmitter.emitPartitionBatchNew(attempt, collector, partition, meta); + } - @Override - public List<Partition> getOrderedPartitions(Partitions partitions) { - return transactionalEmitter.getOrderedPartitions(partitions); - } + @Override + public List<Partition> getOrderedPartitions(Partitions partitions) { + return transactionalEmitter.getOrderedPartitions(partitions); + } - @Override - public void refreshPartitions(List<Partition> partitionList) { - transactionalEmitter.refreshPartitions(partitionList); - } + @Override + public void refreshPartitions(List<Partition> partitionList) { + transactionalEmitter.refreshPartitions(partitionList); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java index 7123304..f062201 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java @@ -15,50 +15,49 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.trident; import java.util.Map; - import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import org.apache.storm.eventhubs.spout.IEventDataScheme; - import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Fields; import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; +import org.apache.storm.tuple.Fields; /** * Opaque Trident EventHubs Spout */ public class OpaqueTridentEventHubSpout implements IOpaquePartitionedTridentSpout<Partitions, Partition, Map> { - private static final long serialVersionUID = 1L; - private final IEventDataScheme scheme; - private final EventHubSpoutConfig spoutConfig; - - public OpaqueTridentEventHubSpout(EventHubSpoutConfig config) { - spoutConfig = config; - scheme = spoutConfig.getEventDataScheme(); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - - @Override - public IOpaquePartitionedTridentSpout.Coordinator<Partitions> getCoordinator( - Map<String, Object> conf, TopologyContext context) { - return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig); - } - - @Override - public IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> getEmitter( - Map<String, Object> conf, TopologyContext context) { - return new OpaqueTridentEventHubEmitter(spoutConfig); - } - - @Override - public Fields getOutputFields() { - return scheme.getOutputFields(); - } + private static final long serialVersionUID = 1L; + private final IEventDataScheme scheme; + private final EventHubSpoutConfig spoutConfig; + + public OpaqueTridentEventHubSpout(EventHubSpoutConfig config) { + spoutConfig = config; + scheme = spoutConfig.getEventDataScheme(); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + @Override + public IOpaquePartitionedTridentSpout.Coordinator<Partitions> getCoordinator( + Map<String, Object> conf, TopologyContext context) { + return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig); + } + + @Override + public IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> getEmitter( + Map<String, Object> conf, TopologyContext context) { + return new OpaqueTridentEventHubEmitter(spoutConfig); + } + + @Override + public Fields getOutputFields() { + return scheme.getOutputFields(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java index 8857eec..44d9d23 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.trident; import java.io.Serializable; @@ -25,15 +26,15 @@ import org.apache.storm.trident.spout.ISpoutPartition; * Represents an EventHub partition */ public class Partition implements ISpoutPartition, Serializable { - private static final long serialVersionUID = 1L; - String partitionId; - - public Partition(EventHubSpoutConfig config, String partitionId) { - this.partitionId = partitionId; - } - - @Override - public String getId() { - return partitionId; - } + private static final long serialVersionUID = 1L; + String partitionId; + + public Partition(EventHubSpoutConfig config, String partitionId) { + this.partitionId = partitionId; + } + + @Override + public String getId() { + return partitionId; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java index c3317d9..375f39d 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.trident; import java.io.Serializable; @@ -25,17 +26,18 @@ import java.util.List; * Represents all EventHub partitions a spout is receiving messages from. */ public class Partitions implements Serializable { - private static final long serialVersionUID = 1L; - private List<Partition> partitionList; - public Partitions() { - partitionList = new ArrayList<Partition>(); - } - - public void addPartition(Partition partition) { - partitionList.add(partition); - } - - public List<Partition> getPartitions() { - return partitionList; - } + private static final long serialVersionUID = 1L; + private List<Partition> partitionList; + + public Partitions() { + partitionList = new ArrayList<Partition>(); + } + + public void addPartition(Partition partition) { + partitionList.add(partition); + } + + public List<Partition> getPartitions() { + return partitionList; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java index d3456e0..8bd7bb1 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java @@ -15,145 +15,149 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.trident; -import org.apache.storm.eventhubs.spout.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.storm.eventhubs.spout.EventDataWrap; +import org.apache.storm.eventhubs.spout.EventHubReceiverImpl; +import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; +import org.apache.storm.eventhubs.spout.FieldConstants; +import org.apache.storm.eventhubs.spout.IEventHubReceiver; +import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.spout.IPartitionedTridentSpout; import org.apache.storm.trident.topology.TransactionAttempt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class TransactionalTridentEventHubEmitter implements IPartitionedTridentSpout.Emitter<Partitions, Partition, Map<String, Object>> { - private static final Logger logger = LoggerFactory.getLogger(TransactionalTridentEventHubEmitter.class); - private final int batchSize; - private final EventHubSpoutConfig spoutConfig; - private Map<String, ITridentPartitionManager> pmMap; - private ITridentPartitionManagerFactory pmFactory; - private IEventHubReceiverFactory recvFactory; - - public TransactionalTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) { - //use batch size that matches the default credit size - this(spoutConfig, spoutConfig.getReceiverCredits(), null, null); - } - - public TransactionalTridentEventHubEmitter(final EventHubSpoutConfig spoutConfig, - int batchSize, - ITridentPartitionManagerFactory pmFactory, - IEventHubReceiverFactory recvFactory) { - this.spoutConfig = spoutConfig; - this.batchSize = batchSize; - this.pmFactory = pmFactory; - this.recvFactory = recvFactory; - pmMap = new HashMap<String, ITridentPartitionManager>(); - if(this.pmFactory == null) { - this.pmFactory = new ITridentPartitionManagerFactory() { - @Override - public ITridentPartitionManager create(IEventHubReceiver receiver) { - return new TridentPartitionManager(spoutConfig, receiver); - } - }; + private static final Logger logger = LoggerFactory.getLogger(TransactionalTridentEventHubEmitter.class); + private final int batchSize; + private final EventHubSpoutConfig spoutConfig; + private Map<String, ITridentPartitionManager> pmMap; + private ITridentPartitionManagerFactory pmFactory; + private IEventHubReceiverFactory recvFactory; + + public TransactionalTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) { + //use batch size that matches the default credit size + this(spoutConfig, spoutConfig.getReceiverCredits(), null, null); } - if(this.recvFactory == null) { - this.recvFactory = new IEventHubReceiverFactory() { - @Override - public IEventHubReceiver create(EventHubSpoutConfig config, - String partitionId) { - return new EventHubReceiverImpl(config, partitionId); + + public TransactionalTridentEventHubEmitter(final EventHubSpoutConfig spoutConfig, + int batchSize, + ITridentPartitionManagerFactory pmFactory, + IEventHubReceiverFactory recvFactory) { + this.spoutConfig = spoutConfig; + this.batchSize = batchSize; + this.pmFactory = pmFactory; + this.recvFactory = recvFactory; + pmMap = new HashMap<String, ITridentPartitionManager>(); + if (this.pmFactory == null) { + this.pmFactory = new ITridentPartitionManagerFactory() { + @Override + public ITridentPartitionManager create(IEventHubReceiver receiver) { + return new TridentPartitionManager(spoutConfig, receiver); + } + }; + } + if (this.recvFactory == null) { + this.recvFactory = new IEventHubReceiverFactory() { + @Override + public IEventHubReceiver create(EventHubSpoutConfig config, + String partitionId) { + return new EventHubReceiverImpl(config, partitionId); + } + }; } - }; - } - } - - @Override - public void close() { - for(ITridentPartitionManager pm: pmMap.values()) { - pm.close(); - } - } - - /** - * Check if partition manager for a given partiton is created - * if not, create it. - * @param partition - */ - private ITridentPartitionManager getOrCreatePartitionManager(Partition partition) { - ITridentPartitionManager pm; - if(!pmMap.containsKey(partition.getId())) { - IEventHubReceiver receiver = recvFactory.create(spoutConfig, partition.getId()); - pm = pmFactory.create(receiver); - pmMap.put(partition.getId(), pm); } - else { - pm = pmMap.get(partition.getId()); + + @Override + public void close() { + for (ITridentPartitionManager pm : pmMap.values()) { + pm.close(); + } } - return pm; - } - - @Override - public void emitPartitionBatch(TransactionAttempt attempt, - TridentCollector collector, Partition partition, Map<String, Object> meta) { - String offset = (String)meta.get("offset"); - int count = Integer.parseInt((String)meta.get("count")); - logger.info("re-emit for partition " + partition.getId() + ", offset=" + offset + ", count=" + count); - ITridentPartitionManager pm = getOrCreatePartitionManager(partition); - List<EventDataWrap> listEvents = pm.receiveBatch(offset, count); - if(listEvents.size() != count) { - logger.error("failed to refetch eventhub messages, new count=" + listEvents.size()); - return; + + /** + * Check if partition manager for a given partiton is created if not, create it. + * + * @param partition + */ + private ITridentPartitionManager getOrCreatePartitionManager(Partition partition) { + ITridentPartitionManager pm; + if (!pmMap.containsKey(partition.getId())) { + IEventHubReceiver receiver = recvFactory.create(spoutConfig, partition.getId()); + pm = pmFactory.create(receiver); + pmMap.put(partition.getId(), pm); + } else { + pm = pmMap.get(partition.getId()); + } + return pm; } - for(EventDataWrap ed: listEvents) { - List<Object> tuples = - spoutConfig.getEventDataScheme().deserialize(ed.getEventData()); - collector.emit(tuples); + @Override + public void emitPartitionBatch(TransactionAttempt attempt, + TridentCollector collector, Partition partition, Map<String, Object> meta) { + String offset = (String) meta.get("offset"); + int count = Integer.parseInt((String) meta.get("count")); + logger.info("re-emit for partition " + partition.getId() + ", offset=" + offset + ", count=" + count); + ITridentPartitionManager pm = getOrCreatePartitionManager(partition); + List<EventDataWrap> listEvents = pm.receiveBatch(offset, count); + if (listEvents.size() != count) { + logger.error("failed to refetch eventhub messages, new count=" + listEvents.size()); + return; + } + + for (EventDataWrap ed : listEvents) { + List<Object> tuples = + spoutConfig.getEventDataScheme().deserialize(ed.getEventData()); + collector.emit(tuples); + } } - } - - @Override - public Map<String, Object> emitPartitionBatchNew(TransactionAttempt attempt, - TridentCollector collector, Partition partition, Map<String, Object> meta) { - ITridentPartitionManager pm = getOrCreatePartitionManager(partition); - String offset = FieldConstants.DefaultStartingOffset; - if(meta != null && meta.containsKey("nextOffset")) { - offset = (String)meta.get("nextOffset"); + + @Override + public Map<String, Object> emitPartitionBatchNew(TransactionAttempt attempt, + TridentCollector collector, Partition partition, Map<String, Object> meta) { + ITridentPartitionManager pm = getOrCreatePartitionManager(partition); + String offset = FieldConstants.DefaultStartingOffset; + if (meta != null && meta.containsKey("nextOffset")) { + offset = (String) meta.get("nextOffset"); + } + //logger.info("emit for partition " + partition.getId() + ", offset=" + offset); + String nextOffset = offset; + + List<EventDataWrap> listEvents = pm.receiveBatch(offset, batchSize); + + for (EventDataWrap ed : listEvents) { + //update nextOffset; + nextOffset = ed.getMessageId().getOffset(); + List<Object> tuples = + spoutConfig.getEventDataScheme().deserialize(ed.getEventData()); + collector.emit(tuples); + } + //logger.info("emitted new batches: " + listEvents.size()); + + Map<String, Object> newMeta = new HashMap<>(); + newMeta.put("offset", offset); + newMeta.put("nextOffset", nextOffset); + newMeta.put("count", "" + listEvents.size()); + return newMeta; } - //logger.info("emit for partition " + partition.getId() + ", offset=" + offset); - String nextOffset = offset; - List<EventDataWrap> listEvents = pm.receiveBatch(offset, batchSize); + @Override + public List<Partition> getOrderedPartitions(Partitions partitions) { + return partitions.getPartitions(); + } - for(EventDataWrap ed: listEvents) { - //update nextOffset; - nextOffset = ed.getMessageId().getOffset(); - List<Object> tuples = - spoutConfig.getEventDataScheme().deserialize(ed.getEventData()); - collector.emit(tuples); + @Override + public void refreshPartitions(List<Partition> partitionList) { + //partition info does not change in EventHub + return; } - //logger.info("emitted new batches: " + listEvents.size()); - - Map<String, Object> newMeta = new HashMap<>(); - newMeta.put("offset", offset); - newMeta.put("nextOffset", nextOffset); - newMeta.put("count", ""+listEvents.size()); - return newMeta; - } - - @Override - public List<Partition> getOrderedPartitions(Partitions partitions) { - return partitions.getPartitions(); - } - - @Override - public void refreshPartitions(List<Partition> partitionList) { - //partition info does not change in EventHub - return; - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java index ee3242a..a2eb73d 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java @@ -15,52 +15,50 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.trident; import java.util.Map; - import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import org.apache.storm.eventhubs.spout.IEventDataScheme; - import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Fields; import org.apache.storm.trident.spout.IPartitionedTridentSpout; -import org.apache.storm.eventhubs.trident.Partition; +import org.apache.storm.tuple.Fields; /** * Transactional Trident EventHub Spout */ -public class TransactionalTridentEventHubSpout implements - IPartitionedTridentSpout<Partitions, Partition, Map<String, Object>> { - private static final long serialVersionUID = 1L; - private final IEventDataScheme scheme; - private final EventHubSpoutConfig spoutConfig; - - public TransactionalTridentEventHubSpout(EventHubSpoutConfig config) { - spoutConfig = config; - scheme = spoutConfig.getEventDataScheme(); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } +public class TransactionalTridentEventHubSpout implements + IPartitionedTridentSpout<Partitions, Partition, Map<String, Object>> { + private static final long serialVersionUID = 1L; + private final IEventDataScheme scheme; + private final EventHubSpoutConfig spoutConfig; + + public TransactionalTridentEventHubSpout(EventHubSpoutConfig config) { + spoutConfig = config; + scheme = spoutConfig.getEventDataScheme(); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } - @Override - public IPartitionedTridentSpout.Coordinator<Partitions> getCoordinator( - Map<String, Object> conf, TopologyContext context) { - return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig); - } + @Override + public IPartitionedTridentSpout.Coordinator<Partitions> getCoordinator( + Map<String, Object> conf, TopologyContext context) { + return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig); + } - @Override - public IPartitionedTridentSpout.Emitter<Partitions, Partition, Map<String, Object>> getEmitter( - Map<String, Object> conf, TopologyContext context) { - return new TransactionalTridentEventHubEmitter(spoutConfig); - } + @Override + public IPartitionedTridentSpout.Emitter<Partitions, Partition, Map<String, Object>> getEmitter( + Map<String, Object> conf, TopologyContext context) { + return new TransactionalTridentEventHubEmitter(spoutConfig); + } - @Override - public Fields getOutputFields() { - return scheme.getOutputFields(); - } + @Override + public Fields getOutputFields() { + return scheme.getOutputFields(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java index a384667..550e106 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java @@ -15,72 +15,75 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ -package org.apache.storm.eventhubs.trident; -import org.apache.storm.eventhubs.spout.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +package org.apache.storm.eventhubs.trident; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import org.apache.storm.eventhubs.spout.EventDataWrap; +import org.apache.storm.eventhubs.spout.EventHubException; +import org.apache.storm.eventhubs.spout.EventHubFilter; +import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; +import org.apache.storm.eventhubs.spout.FieldConstants; +import org.apache.storm.eventhubs.spout.IEventHubReceiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TridentPartitionManager implements ITridentPartitionManager { - private static final Logger logger = LoggerFactory.getLogger(TridentPartitionManager.class); - private final int receiveTimeoutMs = 5000; - private final IEventHubReceiver receiver; - private final EventHubSpoutConfig spoutConfig; - private String lastOffset = FieldConstants.DefaultStartingOffset; - - public TridentPartitionManager(EventHubSpoutConfig spoutConfig, IEventHubReceiver receiver) { - this.receiver = receiver; - this.spoutConfig = spoutConfig; - } - - @Override - public boolean open(String offset) { - try { - if((offset == null || offset.equals(FieldConstants.DefaultStartingOffset)) - && spoutConfig.getEnqueueTimeFilter() != 0) { - receiver.open(new EventHubFilter(Instant.ofEpochMilli(spoutConfig.getEnqueueTimeFilter()))); - } - else { - receiver.open(new EventHubFilter(offset)); - } - lastOffset = offset; - return true; + private static final Logger logger = LoggerFactory.getLogger(TridentPartitionManager.class); + private final int receiveTimeoutMs = 5000; + private final IEventHubReceiver receiver; + private final EventHubSpoutConfig spoutConfig; + private String lastOffset = FieldConstants.DefaultStartingOffset; + + public TridentPartitionManager(EventHubSpoutConfig spoutConfig, IEventHubReceiver receiver) { + this.receiver = receiver; + this.spoutConfig = spoutConfig; } - catch(EventHubException ex) { - logger.error("failed to open eventhub receiver: " + ex.getMessage()); - return false; + + @Override + public boolean open(String offset) { + try { + if ((offset == null || offset.equals(FieldConstants.DefaultStartingOffset)) + && spoutConfig.getEnqueueTimeFilter() != 0) { + receiver.open(new EventHubFilter(Instant.ofEpochMilli(spoutConfig.getEnqueueTimeFilter()))); + } else { + receiver.open(new EventHubFilter(offset)); + } + lastOffset = offset; + return true; + } catch (EventHubException ex) { + logger.error("failed to open eventhub receiver: " + ex.getMessage()); + return false; + } } - } - - @Override - public void close() { - receiver.close(); - } - - @Override - public List<EventDataWrap> receiveBatch(String offset, int count) { - List<EventDataWrap> batch = new ArrayList<EventDataWrap>(count); - if(!offset.equals(lastOffset) || !receiver.isOpen()) { - //re-establish connection to eventhub servers using the right offset - //TBD: might be optimized with cache. - close(); - if(!open(offset)) { - return batch; - } + + @Override + public void close() { + receiver.close(); } - - for(int i=0; i<count; ++i) { - EventDataWrap ed = receiver.receive(); - if(ed == null) { - break; - } - batch.add(ed); - lastOffset = ed.getMessageId().getOffset(); + + @Override + public List<EventDataWrap> receiveBatch(String offset, int count) { + List<EventDataWrap> batch = new ArrayList<EventDataWrap>(count); + if (!offset.equals(lastOffset) || !receiver.isOpen()) { + //re-establish connection to eventhub servers using the right offset + //TBD: might be optimized with cache. + close(); + if (!open(offset)) { + return batch; + } + } + + for (int i = 0; i < count; ++i) { + EventDataWrap ed = receiver.receive(); + if (ed == null) { + break; + } + batch.add(ed); + lastOffset = ed.getMessageId().getOffset(); + } + return batch; } - return batch; - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java index b0dd33a..2c8ef2e 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java @@ -15,10 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.samples; import java.io.Serializable; - import org.apache.storm.eventhubs.spout.EventHubSpout; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import org.apache.storm.eventhubs.spout.IEventHubReceiver; @@ -28,27 +28,27 @@ import org.apache.storm.eventhubs.spout.IStateStore; import org.apache.storm.eventhubs.spout.SimplePartitionManager; public class AtMostOnceEventCount extends EventCount implements Serializable { - @Override - protected EventHubSpout createEventHubSpout() { - IPartitionManagerFactory pmFactory = new IPartitionManagerFactory() { - private static final long serialVersionUID = 1L; + public static void main(String[] args) throws Exception { + AtMostOnceEventCount scenario = new AtMostOnceEventCount(); + + scenario.runScenario(args); + } - @Override - public IPartitionManager create(EventHubSpoutConfig spoutConfig, - String partitionId, IStateStore stateStore, - IEventHubReceiver receiver) { - return new SimplePartitionManager(spoutConfig, partitionId, - stateStore, receiver); - } - }; - EventHubSpout eventHubSpout = new EventHubSpout( - spoutConfig, null, pmFactory, null); - return eventHubSpout; - } - - public static void main(String[] args) throws Exception { - AtMostOnceEventCount scenario = new AtMostOnceEventCount(); + @Override + protected EventHubSpout createEventHubSpout() { + IPartitionManagerFactory pmFactory = new IPartitionManagerFactory() { + private static final long serialVersionUID = 1L; - scenario.runScenario(args); - } + @Override + public IPartitionManager create(EventHubSpoutConfig spoutConfig, + String partitionId, IStateStore stateStore, + IEventHubReceiver receiver) { + return new SimplePartitionManager(spoutConfig, partitionId, + stateStore, receiver); + } + }; + EventHubSpout eventHubSpout = new EventHubSpout( + spoutConfig, null, pmFactory, null); + return eventHubSpout; + } }
