Repository: samza
Updated Branches:
  refs/heads/master 3e7f2e52b -> 032a16079


SAMZA-1785: add retry logic in eventhubs system consumer for non transient error

Implement a retry logic in EH system consumer because of lack of nurse job on 
azure and lack of retry logic in samza standlone.

The retry logic can be tuned through config to control max retry count allowed 
within a certain time window (sliding window).

Author: Hai Lu <[email protected]>

Reviewers: Jagadish<[email protected]>

Closes #587 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/032a1607
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/032a1607
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/032a1607

Branch: refs/heads/master
Commit: 032a16079db8cbb1caf00117b985f7575d2c823a
Parents: 3e7f2e5
Author: Hai Lu <[email protected]>
Authored: Mon Jul 30 09:28:16 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Mon Jul 30 09:28:16 2018 -0700

----------------------------------------------------------------------
 .../samza/system/eventhub/EventHubConfig.java   |  40 ++++++
 .../consumer/EventHubSystemConsumer.java        | 123 +++++++++++++------
 .../MockEventHubClientManagerFactory.java       |   4 +
 .../consumer/TestEventHubSystemConsumer.java    |  94 ++++++++++++++
 .../org/apache/samza/testUtils/TestClock.java   |  45 +++++++
 .../org/apache/samza/testUtils/TestClock.java   |  45 -------
 6 files changed, 266 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/032a1607/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
index 61f823c..4e1e3bb 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -85,6 +85,19 @@ public class EventHubConfig extends MapConfig {
   public static final String CONFIG_PER_PARTITION_CONNECTION = 
"systems.%s.eventhubs.perPartition.connection";
   public static final Boolean DEFAULT_CONFIG_PER_PARTITION_CONNECTION = true;
 
+  /*
+   * This set of configs control the max retry count allowed within a certain 
sliding window, as well as
+   * the minimum interval between two retries.
+   * For example, if max retry count is 10, window size is 1 day, min retry 
interval is 10 min, then
+   * we retry up to 10 times within 1 day time frame and we only retry 10 min 
after the last retry.
+   */
+  public static final String CONFIG_MAX_RETRY_COUNT = 
"systems.%s.eventhubs.max.retry.count";
+  public static final long DEFAULT_CONFIG_MAX_RETRIES_COUNT = 3;
+  public static final String CONFIG_RETRY_WINDOW_MS = 
"systems.%s.eventhubs.retry.window.ms";
+  public static final long DEFAULT_CONFIG_RETRY_WINDOW_MS = 
Duration.ofHours(3).toMillis();
+  public static final String CONFIG_MIN_RETRY_INTERVAL_MS = 
"systems.%s.eventhubs.min.retry.interval.ms";
+  public static final long DEFAULT_CONFIG_RETRY_INTERVAL_MS = 
Duration.ofMinutes(3).toMillis();
+
   private final Map<String, String> physcialToId = new HashMap<>();
 
   private static final Logger LOG = 
LoggerFactory.getLogger(EventHubConfig.class);
@@ -310,4 +323,31 @@ public class EventHubConfig extends MapConfig {
     }
     return Boolean.valueOf(isPerPartitionConnection);
   }
+
+  /**
+   * Get max retry count allowed before propagating the exception to users
+   * @param systemaName name of the system
+   * @return long, max retry count allowed
+   */
+  public long getMaxRetryCount(String systemaName) {
+    return getLong(String.format(CONFIG_MAX_RETRY_COUNT, systemaName), 
DEFAULT_CONFIG_MAX_RETRIES_COUNT);
+  }
+
+  /**
+   * Get the sliding window size in ms for tracking the retry count
+   * @param systemName name of the system
+   * @return long, sliding window size in ms
+   */
+  public long getRetryWindowMs(String systemName) {
+    return getLong(String.format(CONFIG_RETRY_WINDOW_MS, systemName), 
DEFAULT_CONFIG_RETRY_WINDOW_MS);
+  }
+
+  /**
+   * Get the minimum interval in ms between two retries on non transient error
+   * @param systemName name of the system
+   * @return long, minimum interval in ms between retries
+   */
+  public long getMinRetryIntervalMs(String systemName) {
+    return getLong(String.format(CONFIG_MIN_RETRY_INTERVAL_MS, systemName), 
DEFAULT_CONFIG_RETRY_INTERVAL_MS);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/032a1607/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 6b3f344..454fc57 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -20,6 +20,7 @@
 package org.apache.samza.system.eventhub.consumer;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubException;
 import com.microsoft.azure.eventhubs.EventPosition;
@@ -33,6 +34,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -43,6 +47,7 @@ import org.apache.commons.lang3.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.SlidingTimeWindowReservoir;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.eventhub.EventHubClientManager;
@@ -53,6 +58,7 @@ import 
org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
 import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
 import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
 import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.Clock;
 import org.apache.samza.util.ShutdownUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -137,22 +143,39 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
   private final String systemName;
   private final EventHubClientManagerFactory eventHubClientManagerFactory;
 
-  // Partition receiver error propagation
-  private final AtomicReference<Throwable> eventHubHandlerError = new 
AtomicReference<>(null);
+  // Partition receiver non transient error propagation
+  private final AtomicReference<Throwable> eventHubNonTransientError = new 
AtomicReference<>(null);
+
+  private final ExecutorService reconnectTaskRunner = 
Executors.newSingleThreadExecutor(
+      new 
ThreadFactoryBuilder().setNameFormat("EventHubs-Reconnect-Task").setDaemon(true).build());
+  private long lastRetryTs = 0;
+
+  private final Clock clock;
+  @VisibleForTesting
+  final SlidingTimeWindowReservoir recentRetryAttempts;
+  @VisibleForTesting
+  volatile Future reconnectTaskStatus = null;
 
   public EventHubSystemConsumer(EventHubConfig config, String systemName,
       EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, 
Interceptor> interceptors,
       MetricsRegistry registry) {
-    super(registry, System::currentTimeMillis);
+    this(config, systemName, eventHubClientManagerFactory, interceptors, 
registry, System::currentTimeMillis);
+  }
+
+  EventHubSystemConsumer(EventHubConfig config, String systemName,
+      EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, 
Interceptor> interceptors,
+      MetricsRegistry registry, Clock clock) {
+    super(registry, clock);
 
     this.config = config;
+    this.clock = clock;
     this.systemName = systemName;
     this.interceptors = interceptors;
     this.eventHubClientManagerFactory = eventHubClientManagerFactory;
     List<String> streamIds = config.getStreams(systemName);
     prefetchCount = config.getPrefetchCount(systemName);
 
-
+    recentRetryAttempts = new 
SlidingTimeWindowReservoir(config.getRetryWindowMs(systemName), clock);
 
     // Initiate metrics
     eventReadRates =
@@ -231,14 +254,9 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
     return eventHubClientManager;
   }
 
-  @Override
-  public void start() {
-    if (isStarted) {
-      LOG.warn("Trying to start EventHubSystemConsumer while it's already 
started. Ignore the request.");
-      return;
-    }
-    isStarted = true;
+  private synchronized void initializeEventHubsManagers() {
     LOG.info("Starting EventHubSystemConsumer. Count of SSPs registered: " + 
streamPartitionOffsets.entrySet().size());
+    eventHubNonTransientError.set(null);
     // Create receivers for Event Hubs
     for (Map.Entry<SystemStreamPartition, String> entry : 
streamPartitionOffsets.entrySet()) {
       SystemStreamPartition ssp = entry.getKey();
@@ -289,25 +307,43 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
       }
       LOG.info(String.format("Connection successfully started for 
namespace=%s, entity=%s ", namespace, entityPath));
     }
+  }
+
+  @Override
+  public void start() {
+    if (isStarted) {
+      LOG.warn("Trying to start EventHubSystemConsumer while it's already 
started. Ignore the request.");
+      return;
+    }
+    isStarted = true;
+    initializeEventHubsManagers();
     LOG.info("EventHubSystemConsumer started");
   }
 
   @Override
   public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
       Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws 
InterruptedException {
-    Throwable handlerError = eventHubHandlerError.get();
-
-    if (handlerError != null) {
-      if (isErrorTransient(handlerError)) {
-        // Log a warning if the error is transient
-        // Partition receiver handler OnError should have handled it by 
recreating the receiver
-        LOG.warn("Received a transient error from event hub partition 
receiver, restarted receiver", handlerError);
+    Throwable handlerError = eventHubNonTransientError.get();
+    /*
+     * We will retry for non transient error by instantiating a new EventHubs 
client if
+     * 1. Last retry happened more than CONFIG_MIN_RETRY_INTERVAL_MS ms ago. 
Otherwise we ignore
+     * 2. We haven't reached CONFIG_MAX_RETRY_COUNT allowed within the 
CONFIG_RETRY_WINDOW_MS window.
+     *    Otherwise we throw
+     */
+    if (handlerError != null && clock.currentTimeMillis() - lastRetryTs > 
config.getMinRetryIntervalMs(systemName)) {
+      int currentRetryCount = recentRetryAttempts.size();
+      long maxRetryCount = config.getMaxRetryCount(systemName);
+      if (currentRetryCount < maxRetryCount) {
+        LOG.warn("Received non transient error. Will retry.", handlerError);
+        LOG.info("Current retry count within window: {}. max retry count 
allowed: {}. window size: {} ms",
+            currentRetryCount, maxRetryCount, 
config.getRetryWindowMs(systemName));
+        long now = clock.currentTimeMillis();
+        recentRetryAttempts.update(now);
+        lastRetryTs = now;
+        reconnectTaskStatus = 
reconnectTaskRunner.submit(this::renewEventHubsClient);
       } else {
-        // Propagate the error to user if the throwable is either
-        // 1. permanent ServiceBusException error from client
-        // 2. SamzaException thrown bu the EventHubConsumer
-        //   2a. Interrupted during put operation to BEM
-        //   2b. Failure in renewing the Partititon Receiver
+        LOG.error("Retries exhausted. Reached max allowed retries: ({}) within 
window {} ms", currentRetryCount,
+            config.getRetryWindowMs(systemName));
         String msg = "Received a non transient error from event hub partition 
receiver";
         throw new SamzaException(msg, handlerError);
       }
@@ -316,6 +352,17 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
     return super.poll(systemStreamPartitions, timeout);
   }
 
+  private synchronized void renewEventHubsClient() {
+    try {
+      LOG.info("Start to renew eventhubs client");
+      shutdownEventHubsManagers(); // The shutdown is in parallel and time 
bounded
+      initializeEventHubsManagers();
+    } catch (Exception e) {
+      LOG.error("Failed to renew eventhubs client", e);
+      eventHubNonTransientError.set(e);
+    }
+  }
+
   private void renewPartitionReceiver(SystemStreamPartition ssp) {
     String streamId = config.getStreamId(ssp.getStream());
     EventHubClientManager eventHubClientManager = 
perPartitionEventHubManagers.get(ssp);
@@ -341,15 +388,12 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
       receiver.setReceiveHandler(streamPartitionHandlers.get(ssp));
       streamPartitionReceivers.put(ssp, receiver);
     } catch (Exception e) {
-      eventHubHandlerError.set(new SamzaException(
+      eventHubNonTransientError.set(new SamzaException(
           String.format("Failed to recreate receiver for EventHubs after 
ReceiverHandlerError (ssp=%s)", ssp), e));
     }
   }
 
-  @Override
-  public void stop() {
-    LOG.info("Stopping event hub system consumer...");
-
+  private synchronized void shutdownEventHubsManagers() {
     // There could be potentially many Receivers and EventHubManagers, so 
close the managers in parallel
     LOG.info("Start shutting down eventhubs receivers");
     
ShutdownUtil.boundedShutdown(streamPartitionReceivers.values().stream().map(receiver
 -> new Runnable() {
@@ -377,16 +421,19 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
 
     perPartitionEventHubManagers.clear();
     perStreamEventHubManagers.clear();
-    isStarted = false;
-    LOG.info("Event hub system consumer stopped.");
   }
 
-  private boolean isErrorTransient(Throwable throwable) {
-    if (throwable instanceof EventHubException) {
-      EventHubException eventHubException = (EventHubException) throwable;
-      return eventHubException.getIsTransient();
+  @Override
+  public void stop() {
+    LOG.info("Stopping event hub system consumer...");
+    try {
+      reconnectTaskRunner.shutdown();
+      shutdownEventHubsManagers();
+      isStarted = false;
+    } catch (Exception e) {
+      LOG.warn("Exception during stop.", e);
     }
-    return false;
+    LOG.info("Event hub system consumer stopped.");
   }
 
   @Override
@@ -471,10 +518,6 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
         EventHubException busException = (EventHubException) throwable;
 
         if (busException.getIsTransient()) {
-
-          // Only set to transient throwable if there has been no previous 
errors
-          eventHubHandlerError.compareAndSet(null, throwable);
-
           LOG.warn(
               String.format("Received transient exception from EH client. 
Renew partition receiver for ssp: %s", ssp),
               throwable);
@@ -492,7 +535,7 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
 
       LOG.error(String.format("Received non transient exception from EH client 
for ssp: %s", ssp), throwable);
       // Propagate non transient or unknown errors
-      eventHubHandlerError.set(throwable);
+      eventHubNonTransientError.set(throwable);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/032a1607/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
index 6ee9bcf..3b4f1ec 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
@@ -70,6 +70,10 @@ public class MockEventHubClientManagerFactory extends 
EventHubClientManagerFacto
     handlers.forEach((ssp, value) -> value.onReceive(eventData.get(ssp)));
   }
 
+  public void triggerError(Map<SystemStreamPartition, PartitionReceiveHandler> 
handlers, Throwable e) {
+    handlers.forEach((ssp, value) -> value.onError(e));
+  }
+
   public EventPosition getPartitionOffset(String partitionId) {
     return startingOffsets.getOrDefault(partitionId, null);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/032a1607/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
index 6e055c6..47e4656 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
@@ -29,6 +29,7 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.eventhub.*;
 import org.apache.samza.system.eventhub.admin.PassThroughInterceptor;
 import org.apache.samza.system.eventhub.producer.SwapFirstLastByteInterceptor;
+import org.apache.samza.testUtils.TestClock;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -36,6 +37,8 @@ import 
org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.*;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -349,4 +352,95 @@ public class TestEventHubSystemConsumer {
     
Assert.assertEquals(counters2.get(EventHubSystemConsumer.EVENT_READ_RATE).getCount(),
 numEvents);
     
Assert.assertEquals(counters2.get(EventHubSystemConsumer.READ_ERRORS).getCount(),
 0);
   }
+
+  @Test
+  public void testNonTransientErrorRetry() throws Exception {
+    String systemName = "eventhubs";
+    String streamName = "testNonTransientErrorRetry";
+    int numEvents = 10; // needs to be less than BLOCKING_QUEUE_SIZE
+    int partitionId = 0;
+    TestClock testClock = new TestClock();
+
+    TestMetricsRegistry testMetrics = new TestMetricsRegistry();
+    Map<SystemStreamPartition, List<EventData>> eventData = new HashMap<>();
+    SystemStreamPartition ssp = new SystemStreamPartition(systemName, 
streamName, new Partition(partitionId));
+    Map<String, Interceptor> interceptors = new HashMap<>();
+    interceptors.put(streamName, new PassThroughInterceptor());
+
+    // create EventData
+    List<EventData> singlePartitionEventData = 
MockEventData.generateEventData(numEvents);
+    eventData.put(ssp, singlePartitionEventData);
+
+    // Set configs
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, 
systemName), streamName);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, 
streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, 
streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, 
streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, 
streamName), MOCK_ENTITY_1);
+    configMap.put(String.format(EventHubConfig.CONFIG_MAX_RETRY_COUNT, 
systemName), "1");
+    MapConfig config = new MapConfig(configMap);
+
+    MockEventHubClientManagerFactory eventHubClientWrapperFactory = new 
MockEventHubClientManagerFactory(eventData);
+
+    EventHubSystemConsumer consumer =
+        new EventHubSystemConsumer(new EventHubConfig(config), systemName, 
eventHubClientWrapperFactory, interceptors,
+            testMetrics, testClock);
+    consumer.register(ssp, EventHubSystemConsumer.END_OF_STREAM);
+    consumer.start();
+
+    // 1st error should retry instead of throw
+    testClock.advanceTime(System.currentTimeMillis());
+    eventHubClientWrapperFactory.triggerError(consumer.streamPartitionHandlers,
+        new EventHubException(false /* is transient */, "test"));
+    consumer.poll(Collections.singleton(ssp), 0).get(ssp);
+    // assert that the reconnect task was submitted and completed eventually
+    Assert.assertNotNull("reconnect task should have been submitted", 
consumer.reconnectTaskStatus);
+    Future lastReconnectTask = consumer.reconnectTaskStatus;
+    lastReconnectTask.get(10000, TimeUnit.MILLISECONDS); // should return 
instantaneously
+    Assert.assertEquals(consumer.recentRetryAttempts.size(), 1);
+
+    // after retry should receive events normally
+    testClock.advanceTime(1);
+    
eventHubClientWrapperFactory.sendToHandlers(consumer.streamPartitionHandlers);
+    List<IncomingMessageEnvelope> result = 
consumer.poll(Collections.singleton(ssp), 0).get(ssp);
+    verifyEvents(result, singlePartitionEventData);
+    Assert.assertEquals(testMetrics.getCounters(streamName).size(), 3);
+    Assert.assertEquals(testMetrics.getGauges(streamName).size(), 2);
+    Map<String, Counter> counters =
+        
testMetrics.getCounters(streamName).stream().collect(Collectors.toMap(Counter::getName,
 Function.identity()));
+    
Assert.assertEquals(counters.get(EventHubSystemConsumer.EVENT_READ_RATE).getCount(),
 numEvents);
+
+    // 2nd error: advance into next window, the older retry should have been 
evicted so this error should cause retry
+    testClock.advanceTime(EventHubConfig.DEFAULT_CONFIG_RETRY_WINDOW_MS + 1);
+    Assert.assertEquals(consumer.recentRetryAttempts.size(), 0);
+    eventHubClientWrapperFactory.triggerError(consumer.streamPartitionHandlers,
+        new EventHubException(false /* is transient */, "test"));
+    consumer.poll(Collections.singleton(ssp), 0).get(ssp);
+    Assert.assertNotNull("reconnect task should have been submitted", 
consumer.reconnectTaskStatus);
+    lastReconnectTask = consumer.reconnectTaskStatus;
+    lastReconnectTask.get(10000, TimeUnit.MILLISECONDS); // should return 
instantaneously
+    Assert.assertEquals(consumer.recentRetryAttempts.size(), 1);
+
+    // 3rd error: 1 ms is within the min retry interval; so poll should do 
nothing
+    testClock.advanceTime(1);
+    eventHubClientWrapperFactory.triggerError(consumer.streamPartitionHandlers,
+        new EventHubException(false /* is transient */, "test"));
+    consumer.poll(Collections.singleton(ssp), 0).get(ssp);
+    Assert.assertEquals("there shouldn't be another retry task within min 
retry interval", consumer.reconnectTaskStatus,
+        lastReconnectTask);
+
+    // 4th error: now the poll should throw
+    testClock.advanceTime(EventHubConfig.DEFAULT_CONFIG_RETRY_INTERVAL_MS + 1);
+    eventHubClientWrapperFactory.triggerError(consumer.streamPartitionHandlers,
+        new EventHubException(false /* is transient */, "test"));
+    try {
+      consumer.poll(Collections.singleton(ssp), 0).get(ssp);
+      Assert.fail("poll should have thrown");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getCause().getMessage(), "test");
+    }
+
+    
Assert.assertEquals(counters.get(EventHubSystemConsumer.READ_ERRORS).getCount(),
 4);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/032a1607/samza-core/src/main/java/org/apache/samza/testUtils/TestClock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/testUtils/TestClock.java 
b/samza-core/src/main/java/org/apache/samza/testUtils/TestClock.java
new file mode 100644
index 0000000..710ebda
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/testUtils/TestClock.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.util.Clock;
+
+import java.time.Duration;
+
+/**
+ * An implementation of {@link Clock} that allows to advance the time by an 
arbitrary duration.
+ * Used for testing.
+ */
+public class TestClock implements Clock {
+
+  long currentTime = 1;
+
+  public void advanceTime(Duration duration) {
+    currentTime += duration.toMillis();
+  }
+
+  public void advanceTime(long millis) {
+    currentTime += millis;
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    return currentTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/032a1607/samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java 
b/samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java
deleted file mode 100644
index 710ebda..0000000
--- a/samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.testUtils;
-
-import org.apache.samza.util.Clock;
-
-import java.time.Duration;
-
-/**
- * An implementation of {@link Clock} that allows to advance the time by an 
arbitrary duration.
- * Used for testing.
- */
-public class TestClock implements Clock {
-
-  long currentTime = 1;
-
-  public void advanceTime(Duration duration) {
-    currentTime += duration.toMillis();
-  }
-
-  public void advanceTime(long millis) {
-    currentTime += millis;
-  }
-
-  @Override
-  public long currentTimeMillis() {
-    return currentTime;
-  }
-}

Reply via email to