[ 
https://issues.apache.org/jira/browse/BEAM-4798?focusedWorklogId=133615&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133615
 ]

ASF GitHub Bot logged work on BEAM-4798:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Aug/18 14:37
            Start Date: 10/Aug/18 14:37
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6177: [BEAM-4798] Fix 
IndexOutOfBoundsException in Flink runner
URL: https://github.com/apache/beam/pull/6177
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 32c8caac96b..a90989218bb 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -202,7 +202,16 @@ public void 
run(SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx) th
     ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> 
readerInvoker =
         new ReaderInvocationUtil<>(stepName, serializedOptions.get(), 
metricContainer);
 
-    if (localReaders.size() == 1) {
+    if (localReaders.size() == 0) {
+      // It can happen when value of parallelism is greater than number of IO 
readers (for example,
+      // parallelism is 2 and number of Kafka topic partitions is 1). In this 
case, we just fall
+      // through to idle this executor.
+      LOG.info("Number of readers is 0 for this task executor, idle");
+
+      // set this, so that the later logic will emit a final watermark and 
then decide whether
+      // to idle or not
+      isRunning = false;
+    } else if (localReaders.size() == 1) {
       // the easy case, we just read from one reader
       UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
 
@@ -437,15 +446,14 @@ public void onProcessingTime(long timestamp) {
   private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
     if (this.isRunning) {
       long watermarkInterval = 
runtime.getExecutionConfig().getAutoWatermarkInterval();
-      long timeToNextWatermark = getTimeToNextWatermark(watermarkInterval);
-      runtime.getProcessingTimeService().registerTimer(timeToNextWatermark, 
this);
+      synchronized (context.getCheckpointLock()) {
+        long timeToNextWatermark =
+            runtime.getProcessingTimeService().getCurrentProcessingTime() + 
watermarkInterval;
+        runtime.getProcessingTimeService().registerTimer(timeToNextWatermark, 
this);
+      }
     }
   }
 
-  private long getTimeToNextWatermark(long watermarkInterval) {
-    return System.currentTimeMillis() + watermarkInterval;
-  }
-
   /** Visible so that we can check this in tests. Must not be used for 
anything else. */
   @VisibleForTesting
   public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> 
getSplitSources() {
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index afb6af16b30..e58907e05e4 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -30,6 +30,7 @@
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -52,7 +53,7 @@
   private final int shardNumber;
   private final boolean dedup;
   private final boolean throwOnFirstSnapshot;
-  private final boolean allowSplitting;
+  private final int fixedNumSplits;
 
   /**
    * We only allow an exception to be thrown from getCheckpointMark at most 
once. This must be
@@ -66,27 +67,30 @@ public static void setFinalizeTracker(List<Integer> 
finalizeTracker) {
   }
 
   public TestCountingSource(int numMessagesPerShard) {
-    this(numMessagesPerShard, 0, false, false, true);
+    this(numMessagesPerShard, 0, false, false, -1);
   }
 
   public TestCountingSource withDedup() {
-    return new TestCountingSource(
-        numMessagesPerShard, shardNumber, true, throwOnFirstSnapshot, true);
+    return new TestCountingSource(numMessagesPerShard, shardNumber, true, 
throwOnFirstSnapshot, -1);
   }
 
   private TestCountingSource withShardNumber(int shardNumber) {
     return new TestCountingSource(
-        numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true);
+        numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, -1);
   }
 
   public TestCountingSource withThrowOnFirstSnapshot(boolean 
throwOnFirstSnapshot) {
     return new TestCountingSource(
-        numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true);
+        numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, -1);
   }
 
   public TestCountingSource withoutSplitting() {
+    return new TestCountingSource(numMessagesPerShard, shardNumber, dedup, 
throwOnFirstSnapshot, 1);
+  }
+
+  public TestCountingSource withFixedNumSplits(int maxNumSplits) {
     return new TestCountingSource(
-        numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, false);
+        numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, 
maxNumSplits);
   }
 
   private TestCountingSource(
@@ -94,12 +98,12 @@ private TestCountingSource(
       int shardNumber,
       boolean dedup,
       boolean throwOnFirstSnapshot,
-      boolean allowSplitting) {
+      int fixedNumSplits) {
     this.numMessagesPerShard = numMessagesPerShard;
     this.shardNumber = shardNumber;
     this.dedup = dedup;
     this.throwOnFirstSnapshot = throwOnFirstSnapshot;
-    this.allowSplitting = allowSplitting;
+    this.fixedNumSplits = fixedNumSplits;
   }
 
   public int getShardNumber() {
@@ -109,8 +113,8 @@ public int getShardNumber() {
   @Override
   public List<TestCountingSource> split(int desiredNumSplits, PipelineOptions 
options) {
     List<TestCountingSource> splits = new ArrayList<>();
-    int numSplits = allowSplitting ? desiredNumSplits : 1;
-    for (int i = 0; i < numSplits; i++) {
+    int actualNumSplits = (fixedNumSplits == -1) ? desiredNumSplits : 
fixedNumSplits;
+    for (int i = 0; i < actualNumSplits; i++) {
       splits.add(withShardNumber(i));
     }
     return splits;
@@ -199,6 +203,11 @@ public TestCountingSource getCurrentSource() {
 
     @Override
     public Instant getWatermark() {
+      if (current >= numMessagesPerShard - 1) {
+        // we won't emit further data, signal this with the final watermark
+        return new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE);
+      }
+
       // The watermark is a promise about future elements, and the timestamps 
of elements are
       // strictly increasing for this source.
       return new Instant(current + 1);
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 072cdcac6dd..74db90a7d07 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -33,6 +33,7 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.ValueWithRecordId;
@@ -59,16 +60,16 @@
 
   /** Parameterized tests. */
   @RunWith(Parameterized.class)
-  public static class UnboundedSourceWrapperTestWithParams {
+  public static class ParameterizedUnboundedSourceWrapperTest {
     private final int numTasks;
     private final int numSplits;
 
-    public UnboundedSourceWrapperTestWithParams(int numTasks, int numSplits) {
+    public ParameterizedUnboundedSourceWrapperTest(int numTasks, int 
numSplits) {
       this.numTasks = numTasks;
       this.numSplits = numSplits;
     }
 
-    @Parameterized.Parameters
+    @Parameterized.Parameters(name = "numTasks = {0}; numSplits={1}")
     public static Collection<Object[]> data() {
       /*
        * Parameters for initializing the tests:
@@ -89,75 +90,113 @@ public UnboundedSourceWrapperTestWithParams(int numTasks, 
int numSplits) {
      */
     @Test
     public void testValueEmission() throws Exception {
-      final int numElements = 20;
+      final int numElementsPerShard = 20;
       final Object checkpointLock = new Object();
       PipelineOptions options = PipelineOptionsFactory.create();
 
-      // this source will emit exactly NUM_ELEMENTS across all parallel 
readers,
+      final long[] numElementsReceived = {0L};
+      final int[] numWatermarksReceived = {0};
+
+      // this source will emit exactly NUM_ELEMENTS for each parallel reader,
       // afterwards it will stall. We check whether we also receive 
NUM_ELEMENTS
       // elements later.
-      TestCountingSource source = new TestCountingSource(numElements);
-      UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark> flinkWrapper =
-          new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
-
-      assertEquals(numSplits, flinkWrapper.getSplitSources().size());
-
-      StreamSource<
-              WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
-              UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark>>
-          sourceOperator = new StreamSource<>(flinkWrapper);
-
-      
AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, 
Integer>>>>
-          testHarness =
-              new AbstractStreamOperatorTestHarness<>(
-                  sourceOperator,
-                  numTasks /* max parallelism */,
-                  numTasks /* parallelism */,
-                  0 /* subtask index */);
-
-      testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
-
-      try {
-        testHarness.open();
-        sourceOperator.run(
-            checkpointLock,
-            new TestStreamStatusMaintainer(),
-            new 
Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
-              private int count = 0;
-
-              @Override
-              public void emitWatermark(Watermark watermark) {}
-
+      TestCountingSource source =
+          new 
TestCountingSource(numElementsPerShard).withFixedNumSplits(numSplits);
+
+      for (int subtaskIndex = 0; subtaskIndex < numTasks; subtaskIndex++) {
+        UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark> flinkWrapper =
+            new UnboundedSourceWrapper<>("stepName", options, source, 
numTasks);
+
+        // the source wrapper will only request as many splits as there are 
tasks and the source
+        // will create at most numSplits splits
+        assertEquals(numSplits, flinkWrapper.getSplitSources().size());
+
+        StreamSource<
+                WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+                UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark>>
+            sourceOperator = new StreamSource<>(flinkWrapper);
+
+        
AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, 
Integer>>>>
+            testHarness =
+                new AbstractStreamOperatorTestHarness<>(
+                    sourceOperator,
+                    numTasks /* max parallelism */,
+                    numTasks /* parallelism */,
+                    subtaskIndex /* subtask index */);
+
+        testHarness.setProcessingTime(System.currentTimeMillis());
+
+        // start a thread that advances processing time, so that we eventually 
get the final
+        // watermark which is only updated via a processing-time trigger
+        Thread processingTimeUpdateThread =
+            new Thread() {
               @Override
-              public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
streamRecord) {
-                collect((StreamRecord) streamRecord);
+              public void run() {
+                while (true) {
+                  try {
+                    synchronized (testHarness.getCheckpointLock()) {
+                      
testHarness.setProcessingTime(System.currentTimeMillis());
+                    }
+                    Thread.sleep(1000);
+                  } catch (Exception e) {
+                    break;
+                  }
+                }
               }
+            };
+        processingTimeUpdateThread.start();
+
+        testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+        try {
+          testHarness.open();
+          sourceOperator.run(
+              checkpointLock,
+              new TestStreamStatusMaintainer(),
+              new 
Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
+                private boolean hasSeenMaxWatermark = false;
+
+                @Override
+                public void emitWatermark(Watermark watermark) {
+                  // we get this when there is no more data
+                  // it can happen that we get the max watermark several 
times, so guard against
+                  // this
+                  if (!hasSeenMaxWatermark
+                      && watermark.getTimestamp()
+                          >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+                    numWatermarksReceived[0]++;
+                    hasSeenMaxWatermark = true;
+                  }
+                }
 
-              @Override
-              public void emitLatencyMarker(LatencyMarker latencyMarker) {}
-
-              @Override
-              public void collect(
-                  StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, 
Integer>>>>
-                      windowedValueStreamRecord) {
-
-                count++;
-                if (count >= numElements) {
-                  throw new SuccessException();
+                @Override
+                public <X> void collect(OutputTag<X> outputTag, 
StreamRecord<X> streamRecord) {
+                  collect((StreamRecord) streamRecord);
                 }
-              }
 
-              @Override
-              public void close() {}
-            });
-      } catch (SuccessException e) {
+                @Override
+                public void emitLatencyMarker(LatencyMarker latencyMarker) {}
 
-        assertEquals(Math.max(1, numSplits / numTasks), 
flinkWrapper.getLocalSplitSources().size());
+                @Override
+                public void collect(
+                    StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, 
Integer>>>>
+                        windowedValueStreamRecord) {
+                  numElementsReceived[0]++;
+                }
 
-        // success
-        return;
+                @Override
+                public void close() {}
+              });
+        } catch (SuccessException e) {
+          processingTimeUpdateThread.interrupt();
+          processingTimeUpdateThread.join();
+          // success, continue for the other subtask indices
+        }
       }
-      fail("Read terminated without producing expected number of outputs");
+      // verify that we get the expected count across all subtasks
+      assertEquals(numElementsPerShard * numSplits, numElementsReceived[0]);
+      // and that we get as many final watermarks as there are subtasks
+      assertEquals(numTasks, numWatermarksReceived[0]);
     }
 
     /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 133615)
    Time Spent: 4.5h  (was: 4h 20m)

> IndexOutOfBoundsException when Flink parallelism > 1
> ----------------------------------------------------
>
>                 Key: BEAM-4798
>                 URL: https://issues.apache.org/jira/browse/BEAM-4798
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.4.0, 2.5.0
>            Reporter: Alexey Romanenko
>            Assignee: Alexey Romanenko
>            Priority: Major
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Running job on Flink in streaming mode and get data from a Kafka topic with 
> parallelism > 1 causes an exception:
> {noformat}
> Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>       at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>       at java.util.ArrayList.get(ArrayList.java:433)
>       at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:277)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>       at 
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748)
> {noformat}
> It happens when number of Kafka topic partitions is less than value of 
> parallelism (number of task slots).
> So, workaround for now can be to set parallelism <= number of topic 
> partitions, thus if parallelism=2 then number_partitions >= 2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to