This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new b1966ca [ZEPPELIN-5016]. Flink interpreter is broken for flink 1.10.2
b1966ca is described below
commit b1966cac03f98833933a3f3749997baf5120bf1e
Author: Jeff Zhang <[email protected]>
AuthorDate: Wed Aug 26 11:38:01 2020 +0800
[ZEPPELIN-5016]. Flink interpreter is broken for flink 1.10.2
### What is this PR for?
There's one change in flink 1.10.2 which cause flink interpreter broken.
This PR fix this issue and also upgrade flink version to 1.10.2 in pom and
integration test.
### What type of PR is it?
[Bug Fix | Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* There's one change in flink 1.10.2 which cause flink interpreter broken.
This PR fix this issue and also upgrade flink version to 1.10.2 in pom and
integration test.
### How should this be tested?
* https://issues.apache.org/jira/browse/ZEPPELIN-5016
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <[email protected]>
Closes #3891 from zjffdu/ZEPPELIN-5016 and squashes the following commits:
e1eee58cb [Jeff Zhang] [ZEPPELIN-5016]. Flink interpreter is broken for
flink 1.10.2
---
.../main/java/org/apache/zeppelin/flink/Flink110Shims.java | 14 +++++++++++++-
flink/pom.xml | 2 +-
.../zeppelin/integration/FlinkIntegrationTest110.java | 2 +-
.../zeppelin/integration/ZeppelinFlinkClusterTest110.java | 2 +-
4 files changed, 16 insertions(+), 4 deletions(-)
diff --git
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index 39ab6e0..fbb1379 100644
---
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++
b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.net.InetAddress;
import java.nio.file.Files;
import java.util.HashMap;
@@ -225,7 +227,17 @@ public class Flink110Shims extends FlinkShims {
@Override
public Object getCustomCli(Object cliFrontend, Object commandLine) {
- return ((CliFrontend)cliFrontend).getActiveCustomCommandLine((CommandLine)
commandLine);
+ try {
+ return ((CliFrontend)
cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
+ } catch (NoSuchMethodError e) {
+ try {
+ Method method =
CliFrontend.class.getMethod("getActiveCustomCommandLine", CommandLine.class);
+ return method.invoke((CliFrontend) cliFrontend, commandLine);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException ex) {
+ LOGGER.error("Fail to call getCustomCli", ex);
+ throw new RuntimeException("Fail to call getCustomCli", ex);
+ }
+ }
}
@Override
diff --git a/flink/pom.xml b/flink/pom.xml
index 5e0ec61..3e489a1 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -42,7 +42,7 @@
</modules>
<properties>
- <flink1.10.version>1.10.1</flink1.10.version>
+ <flink1.10.version>1.10.2</flink1.10.version>
<flink1.11.version>1.11.1</flink1.11.version>
</properties>
diff --git
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
index b8cf293..7779d2f 100644
---
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
+++
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
@@ -29,7 +29,7 @@ public class FlinkIntegrationTest110 extends
FlinkIntegrationTest {
@Parameterized.Parameters
public static List<Object[]> data() {
return Arrays.asList(new Object[][]{
- {"1.10.1"}
+ {"1.10.2"}
});
}
diff --git
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
index 20479de..4400706 100644
---
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
+++
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
@@ -29,7 +29,7 @@ public class ZeppelinFlinkClusterTest110 extends
ZeppelinFlinkClusterTest {
@Parameterized.Parameters
public static List<Object[]> data() {
return Arrays.asList(new Object[][]{
- {"1.10.1"}
+ {"1.10.2"}
});
}