zjffdu commented on a change in pull request #3818:
URL: https://github.com/apache/zeppelin/pull/3818#discussion_r444648764
##########
File path:
flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
##########
@@ -253,6 +254,67 @@ public void testResumeStreamSqlFromSavePoint() throws
IOException, InterpreterEx
resultMessages.get(0).getData().contains("url\tpv\n"));
}
+ @Test
+ public void testResumeStreamSqlFromExistSavePointPath() throws IOException,
InterpreterException, InterruptedException, TimeoutException {
+ String initStreamScalaScript = getInitStreamScript(1000);
+ InterpreterResult result =
flinkInterpreter.interpret(initStreamScalaScript,
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ File savePointDir = FileUtils.getTempDirectory();
+ final Waiter waiter = new Waiter();
+ Thread thread = new Thread(() -> {
+ try {
+ InterpreterContext context = getInterpreterContext();
+ context.getLocalProperties().put("type", "update");
+ context.getLocalProperties().put("savepointDir",
savePointDir.getAbsolutePath());
+ context.getLocalProperties().put("parallelism", "1");
+ context.getLocalProperties().put("maxParallelism", "10");
+ InterpreterResult result2 = sqlInterpreter.interpret("select url,
count(1) as pv from " +
+ "log group by url", context);
+ System.out.println("------------" + context.out.toString());
Review comment:
Don't use System.out, use log4j instead
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]