[
https://issues.apache.org/jira/browse/BEAM-5267?focusedWorklogId=145272&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145272
]
ASF GitHub Bot logged work on BEAM-5267:
----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Sep/18 12:20
Start Date: 18/Sep/18 12:20
Worklog Time Spent: 10m
Work Description: tweise closed pull request #6331: [BEAM-5267] Make
Flink Runner compile compatible with Flink 1.6.0
URL: https://github.com/apache/beam/pull/6331
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
index af953642d4b..ded53af267b 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
@@ -24,10 +24,13 @@
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
/** Reads from a bounded source in streaming. */
-public class ReadSourceStreamingTest extends StreamingProgramTestBase {
+public class ReadSourceStreamingTest extends AbstractTestBase {
protected String resultDir;
protected String resultPath;
@@ -37,8 +40,8 @@ public ReadSourceStreamingTest() {}
private static final String[] EXPECTED_RESULT =
new String[] {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
- @Override
- protected void preSubmit() throws Exception {
+ @Before
+ public void preSubmit() throws Exception {
// Beam Write will add shard suffix to fileName, see ShardNameTemplate.
// So tempFile need have a parent to compare.
File resultParent = createAndRegisterTempFile("result");
@@ -46,13 +49,13 @@ protected void preSubmit() throws Exception {
resultPath = new File(resultParent, "file.txt").getAbsolutePath();
}
- @Override
- protected void postSubmit() throws Exception {
+ @After
+ public void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultDir);
}
- @Override
- protected void testProgram() throws Exception {
+ @Test
+ public void testProgram() throws Exception {
runProgram(resultPath);
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 58b301dbb2f..66600193f0b 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -33,12 +33,15 @@
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
/** Test for GroupByNullKey. */
-public class GroupByNullKeyTest extends StreamingProgramTestBase implements
Serializable {
+public class GroupByNullKeyTest extends AbstractTestBase implements
Serializable {
protected String resultDir;
protected String resultPath;
@@ -48,8 +51,8 @@
public GroupByNullKeyTest() {}
- @Override
- protected void preSubmit() throws Exception {
+ @Before
+ public void preSubmit() throws Exception {
// Beam Write will add shard suffix to fileName, see ShardNameTemplate.
// So tempFile need have a parent to compare.
File resultParent = createAndRegisterTempFile("result");
@@ -57,8 +60,8 @@ protected void preSubmit() throws Exception {
resultPath = new File(resultParent, "file.txt").getAbsolutePath();
}
- @Override
- protected void postSubmit() throws Exception {
+ @After
+ public void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultDir);
}
@@ -78,8 +81,8 @@ public void processElement(ProcessContext c) {
// suppress since toString() of Void is called and key is deliberately null
@SuppressWarnings("ObjectToString")
- @Override
- protected void testProgram() throws Exception {
+ @Test
+ public void testProgram() throws Exception {
Pipeline p = FlinkTestPipeline.createForStreaming();
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
index a6c0b165629..cce44babbd9 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
@@ -33,12 +33,15 @@
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
/** Session window test. */
-public class TopWikipediaSessionsTest extends StreamingProgramTestBase
implements Serializable {
+public class TopWikipediaSessionsTest extends AbstractTestBase implements
Serializable {
protected String resultDir;
protected String resultPath;
@@ -55,8 +58,8 @@ public TopWikipediaSessionsTest() {}
"user: user3 value:2"
};
- @Override
- protected void preSubmit() throws Exception {
+ @Before
+ public void preSubmit() throws Exception {
// Beam Write will add shard suffix to fileName, see ShardNameTemplate.
// So tempFile need have a parent to compare.
File resultParent = createAndRegisterTempFile("result");
@@ -64,13 +67,13 @@ protected void preSubmit() throws Exception {
resultPath = new File(resultParent, "file.txt").getAbsolutePath();
}
- @Override
- protected void postSubmit() throws Exception {
+ @After
+ public void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultDir);
}
- @Override
- protected void testProgram() throws Exception {
+ @Test
+ public void testProgram() throws Exception {
Pipeline p = FlinkTestPipeline.createForStreaming();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 145272)
Time Spent: 40m (was: 0.5h)
> Update Flink Runner to Flink 1.6.x
> ----------------------------------
>
> Key: BEAM-5267
> URL: https://issues.apache.org/jira/browse/BEAM-5267
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Major
> Fix For: 2.8.0
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> For the next release, the Flink version should be bumped. As changes for
> 2.7.0 are already frozen, it's going to be 2.8.0.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)