[ 
https://issues.apache.org/jira/browse/BEAM-7442?focusedWorklogId=251515&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251515
 ]

ASF GitHub Bot logged work on BEAM-7442:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/May/19 06:01
            Start Date: 31/May/19 06:01
    Worklog Time Spent: 10m 
      Work Description: JozoVilcek commented on pull request #8715: 
[BEAM-7442][BEAM-5650] Read sequentially from bounded sources in 
UnboundedSourceWrapper
URL: https://github.com/apache/beam/pull/8715#discussion_r289264436
 
 

 ##########
 File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
 ##########
 @@ -690,6 +701,68 @@ private static void testSourceDoesNotShutdown(boolean 
shouldHaveReaders) throws
       }
       assertThat(thread.isAlive(), is(false));
     }
+
+    @Test
+    public void testSequentialReadingFromBoundedSource() throws Exception {
+      UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter source =
+          new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(
+              new BoundedIntegerSource(0, 1000));
+
+      FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+      options.setShutdownSourcesOnFinalWatermark(true);
+
+      UnboundedSourceWrapper<Integer, TestCountingSource.CounterMark> 
sourceWrapper =
+          new UnboundedSourceWrapper<>("sequentialRead", options, source, 4);
+      StreamingRuntimeContext runtimeContextMock = 
Mockito.mock(StreamingRuntimeContext.class);
+
+      Mockito.when(runtimeContextMock.getIndexOfThisSubtask()).thenReturn(0);
+      when(runtimeContextMock.getNumberOfParallelSubtasks()).thenReturn(2);
+      when(runtimeContextMock.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+
+      TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+      processingTimeService.setCurrentTime(0);
+      
when(runtimeContextMock.getProcessingTimeService()).thenReturn(processingTimeService);
+
+      when(runtimeContextMock.getMetricGroup()).thenReturn(new 
UnregisteredMetricsGroup());
+
+      sourceWrapper.setRuntimeContext(runtimeContextMock);
+
+      sourceWrapper.open(new Configuration());
+      assertThat(sourceWrapper.getLocalReaders().size(), is(2));
+
+      List<Integer> integers = new ArrayList<>();
+      sourceWrapper.run(
+          new 
SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<Integer>>>() {
+
+            @Override
+            public void collect(WindowedValue<ValueWithRecordId<Integer>> 
element) {
+              integers.add(element.getValue().getValue());
+            }
+
+            @Override
+            public void collectWithTimestamp(
+                WindowedValue<ValueWithRecordId<Integer>> element, long 
timestamp) {
+              throw new IllegalStateException("Should not collect with 
timestamp");
+            }
+
+            @Override
+            public void emitWatermark(Watermark mark) {}
+
+            @Override
+            public void markAsTemporarilyIdle() {}
+
+            @Override
+            public Object getCheckpointLock() {
+              return new Object();
+            }
+
+            @Override
+            public void close() {}
+          });
+
+      // the source is effectively split into two parts
+      assertThat(integers.size(), is(500));
 
 Review comment:
   From reading the code and test, I expected that result should be that we 
observe whole input, 1000 integers, but not out of order e.g. 
`[1,500,2,501,.... 1000, 499]` but in sequence `[1,2,3 ... 1000]`
   What do I miss?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 251515)
    Time Spent: 40m  (was: 0.5h)

> Bounded Reads for Flink Runner fails with OOM
> ---------------------------------------------
>
>                 Key: BEAM-7442
>                 URL: https://issues.apache.org/jira/browse/BEAM-7442
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Akshay Iyangar
>            Assignee: Akshay Iyangar
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to