[
https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Cullen updated FLINK-20995:
----------------------------------
Description:
Exception on this line:
{code:java}
try (CloseableIterator<Row> iterator = log_counts.execute().collect()) {
...
{code}
Here's the code snippet: (See Stack Trace below)
{code:java}
...
final EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
final TableEnvironment tEnv = TableEnvironment.create(settings);
String ddl = "CREATE TABLE log_counts (\n" +
" msg_id STRING,\n" +
" hostname STRING,\n" +
" last_updated TIMESTAMP(3),\n" +
" WATERMARK FOR last_updated AS last_updated - INTERVAL '5'
SECOND\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' =
'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" +
" 'connector.table' = 'chi_logger_intake',\n" +
" 'connector.driver' = 'org.postgresql.Driver',\n" +
" 'connector.username' = 'user',\n" +
" 'connector.password' = 'password'\n" +
")";
tEnv.executeSql(ddl);
Table log_counts = tEnv.from("log_counts")
.filter($("hostname").isNotNull()
.and($("hostname").isNotEqual("")))
.window(Tumble
.over(lit(5).minutes())
.on($("last_updated")).as("w"))
.groupBy($("msg_id"), $("hostname"), $("w"))
.select($("msg_id"),
$("hostname"),
$("msg_id").count().as("cnt"));
try (CloseableIterator<Row> iterator = log_counts.execute().collect()) {
final List<Row> materializedUpdates = new ArrayList<>();
iterator.forEachRemaining(
row -> {
final RowKind kind = row.getKind();
switch (kind) {
case INSERT:
case UPDATE_AFTER:
row.setKind(RowKind.INSERT); // for full
equality
materializedUpdates.add(row);
break;
case UPDATE_BEFORE:
case DELETE:
row.setKind(RowKind.INSERT); // for full
equality
materializedUpdates.remove(row);
break;
}
});
// show the final output table if the result is bounded,
// the output should exclude San Antonio because it has a smaller
population than
// Houston or Dallas in Texas (TX)
materializedUpdates.forEach(System.out::println);
}{code}
Stack Trace:
{code:java}
2021-01-15 16:52:00,628 WARN
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring the
job submission via query parameters is deprecated. Please migrate to submitting
a JSON request instead.
2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - Starting
program (detached: true)
2021-01-15 16:52:00,678 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted.
2021-01-15 16:52:00,678 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
2021-01-15 16:52:00,830 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph
submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
2021-01-15 16:52:00,830 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job
84c9f12fe943bc7f32ee637666ed3bc1 (collect).
2021-01-15 16:52:00,831 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_68 .
2021-01-15 16:52:00,831 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Initializing job collect (84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:00,832 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Using restart back off time strategy NoRestartBackoffTimeStrategy for collect
(84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:00,832 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Running initialization on master for job collect
(84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:00,836 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Successfully ran initialization on master in 3 ms.
2021-01-15 16:52:00,836 INFO
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built
1 pipelined regions in 0 ms
2021-01-15 16:52:00,836 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Using job/cluster config to configure application-defined state backend: File
State Backend (checkpoints: 's3://flink/checkpoints', savepoints:
's3://flink/savepoints', asynchronous: TRUE, fileStateThreshold: 20480)
2021-01-15 16:52:00,836 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Using application-defined state backend: File State Backend (checkpoints:
's3://flink/checkpoints', savepoints: 's3://flink/savepoints', asynchronous:
TRUE, fileStateThreshold: 20480)
2021-01-15 16:52:06,865 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint
found during restore.
2021-01-15 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Using failover strategy
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@39b9ad24
for collect (84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:06,866 INFO
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl [] - JobManager runner
for job collect (84c9f12fe943bc7f32ee637666ed3bc1) was granted leadership with
session id 00000000-0000-0000-0000-000000000000 at
akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68.
2021-01-15 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Starting execution of job collect (84c9f12fe943bc7f32ee637666ed3bc1) under job
master id 00000000000000000000000000000000.
2021-01-15 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Starting scheduling with scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2021-01-15 16:52:06,866 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect
(84c9f12fe943bc7f32ee637666ed3bc1) switched from state CREATED to RUNNING.
2021-01-15 16:52:06,866 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
JdbcTableSource(msg_id, hostname, last_updated) ->
SourceConversion(table=[default_catalog.default_database.log_counts, source:
[JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
(d22b3ac56f07e182ba5b74d68fa74fb1) switched from CREATED to SCHEDULED.
2021-01-15 16:52:06,866 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
GroupWindowAggregate(groupBy=[msg_id, hostname],
window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS
cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
(09cee06206ad355b327cb8487773cd39) switched from CREATED to SCHEDULED.
2021-01-15 16:52:06,866 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Cannot serve slot
request, no ResourceManager connected. Adding as pending request
[SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}]
2021-01-15 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Connecting to ResourceManager
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
2021-01-15 16:52:06,867 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Resolved ResourceManager address, beginning registration
2021-01-15 16:52:06,867 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering job manager
[email protected]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
for job 84c9f12fe943bc7f32ee637666ed3bc1.
2021-01-15 16:52:06,867 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registered job manager
[email protected]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
for job 84c9f12fe943bc7f32ee637666ed3bc1.
2021-01-15 16:52:06,867 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
JobManager successfully registered at ResourceManager, leader id:
00000000000000000000000000000000.
2021-01-15 16:52:06,867 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Requesting new
slot [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] and profile
ResourceProfile{UNKNOWN} with allocation id b0559997a428b1d31d9e57d6532e026b
from resource manager.
2021-01-15 16:52:06,868 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Request
slot with profile ResourceProfile{UNKNOWN} for job
84c9f12fe943bc7f32ee637666ed3bc1 with allocation id
b0559997a428b1d31d9e57d6532e026b.
2021-01-15 16:52:06,874 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
JdbcTableSource(msg_id, hostname, last_updated) ->
SourceConversion(table=[default_catalog.default_database.log_counts, source:
[JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
(d22b3ac56f07e182ba5b74d68fa74fb1) switched from SCHEDULED to DEPLOYING.
2021-01-15 16:52:06,882 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source:
JdbcTableSource(msg_id, hostname, last_updated) ->
SourceConversion(table=[default_catalog.default_database.log_counts, source:
[JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (attempt #0)
with attempt id d22b3ac56f07e182ba5b74d68fa74fb1 to 10.42.1.148:6122-9b9553 @
10.42.1.148 (dataPort=40391) with allocation id b0559997a428b1d31d9e57d6532e026b
2021-01-15 16:52:06,883 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
GroupWindowAggregate(groupBy=[msg_id, hostname],
window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS
cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
(09cee06206ad355b327cb8487773cd39) switched from SCHEDULED to DEPLOYING.
2021-01-15 16:52:06,883 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
GroupWindowAggregate(groupBy=[msg_id, hostname],
window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS
cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (attempt #0)
with attempt id 09cee06206ad355b327cb8487773cd39 to 10.42.1.148:6122-9b9553 @
10.42.1.148 (dataPort=40391) with allocation id b0559997a428b1d31d9e57d6532e026b
2021-01-15 16:52:07,038 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
GroupWindowAggregate(groupBy=[msg_id, hostname],
window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS
cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
(09cee06206ad355b327cb8487773cd39) switched from DEPLOYING to RUNNING.
2021-01-15 16:52:07,038 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
JdbcTableSource(msg_id, hostname, last_updated) ->
SourceConversion(table=[default_catalog.default_database.log_counts, source:
[JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
(d22b3ac56f07e182ba5b74d68fa74fb1) switched from DEPLOYING to RUNNING.
2021-01-15 16:52:07,057 INFO
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator
[] - Received sink socket server address: /10.42.1.148:39303
2021-01-15 16:52:07,060 WARN
org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname could
be resolved for the IP address 10.42.1.148, using IP address as host name.
Local input split assignment (such as for HDFS files) may be impacted.
2021-01-15 16:52:07,988 WARN
org.apache.flink.client.deployment.application.DetachedApplicationRunner [] -
Could not execute application:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
[?:1.8.0_275]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_275]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_275]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:719)
~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)
~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192)
~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_275]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_275]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_275]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 13 more
Caused by: java.lang.IllegalArgumentException: Job client must be a
CoordinationRequestGateway. This is a bug.
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.setJobClient(CollectResultFetcher.java:95)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.setJobClient(CollectResultIterator.java:98)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.table.planner.sinks.SelectTableSinkBase$1.setJobClient(SelectTableSinkBase.java:93)
~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:709)
~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)
~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192)
~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_275]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_275]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_275]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 13 more
2021-01-15 16:52:07,989 ERROR
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Exception
occurred in REST handler: Could not execute application.
2021-01-15 16:52:08,462 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
JdbcTableSource(msg_id, hostname, last_updated) ->
SourceConversion(table=[default_catalog.default_database.log_counts, source:
[JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
(d22b3ac56f07e182ba5b74d68fa74fb1) switched from RUNNING to FINISHED.
2021-01-15 16:52:08,465 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
GroupWindowAggregate(groupBy=[msg_id, hostname],
window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS
cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
(09cee06206ad355b327cb8487773cd39) switched from RUNNING to FINISHED.
2021-01-15 16:52:08,465 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect
(84c9f12fe943bc7f32ee637666ed3bc1) switched from state RUNNING to FINISHED.
2021-01-15 16:52:08,466 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping
checkpoint coordinator for job 84c9f12fe943bc7f32ee637666ed3bc1.
2021-01-15 16:52:08,466 INFO
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] -
Shutting down
2021-01-15 16:52:08,466 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
84c9f12fe943bc7f32ee637666ed3bc1 reached globally terminal state FINISHED.
2021-01-15 16:52:08,467 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Stopping the JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:08,467 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending
SlotPool.
2021-01-15 16:52:08,468 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Close ResourceManager connection a76a3c5321498f13d0552421928c6062: Stopping
JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1)..
2021-01-15 16:52:08,468 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping SlotPool.
2021-01-15 16:52:08,468 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Disconnect job manager
[email protected]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource manager.
{code}
was:
Exception on this line:
{code:java}
try (CloseableIterator<Row> iterator = log_counts.execute().collect()) {
...
{code}
Here's the code snippet: (See Stack Trace below)
{code:java}
...
final EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
final TableEnvironment tEnv = TableEnvironment.create(settings);
String ddl = "CREATE TABLE log_counts (\n" +
" msg_id STRING,\n" +
" hostname STRING,\n" +
" last_updated TIMESTAMP(3),\n" +
" WATERMARK FOR last_updated AS last_updated - INTERVAL '5'
SECOND\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' =
'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" +
" 'connector.table' = 'chi_logger_intake',\n" +
" 'connector.driver' = 'org.postgresql.Driver',\n" +
" 'connector.username' = 'user',\n" +
" 'connector.password' = 'password'\n" +
")";
tEnv.executeSql(ddl);
Table log_counts = tEnv.from("log_counts")
.filter($("hostname").isNotNull()
.and($("hostname").isNotEqual("")))
.window(Tumble
.over(lit(5).minutes())
.on($("last_updated")).as("w"))
.groupBy($("msg_id"), $("hostname"), $("w"))
.select($("msg_id"),
$("hostname"),
$("msg_id").count().as("cnt"));
try (CloseableIterator<Row> iterator = log_counts.execute().collect()) {
final List<Row> materializedUpdates = new ArrayList<>();
iterator.forEachRemaining(
row -> {
final RowKind kind = row.getKind();
switch (kind) {
case INSERT:
case UPDATE_AFTER:
row.setKind(RowKind.INSERT); // for full
equality
materializedUpdates.add(row);
break;
case UPDATE_BEFORE:
case DELETE:
row.setKind(RowKind.INSERT); // for full
equality
materializedUpdates.remove(row);
break;
}
});
// show the final output table if the result is bounded,
// the output should exclude San Antonio because it has a smaller
population than
// Houston or Dallas in Texas (TX)
materializedUpdates.forEach(System.out::println);
}{code}
Stack Trace:
{code:java}
2021-01-15 16:52:00,628 WARN
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring the
job submission via query parameters is deprecated. Please migrate to submitting
a JSON request instead.
2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - Starting
program (detached: true)
2021-01-15 16:52:00,678 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted.
2021-01-15 16:52:00,678 INFO
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] -
Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
2021-01-15 16:52:00,830 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph
submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
2021-01-15 16:52:00,830 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job
84c9f12fe943bc7f32ee637666ed3bc1 (collect).
2021-01-15 16:52:00,831 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_68 .
2021-01-15 16:52:00,831 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Initializing job collect (84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:00,832 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Using restart back off time strategy NoRestartBackoffTimeStrategy for collect
(84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:00,832 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Running initialization on master for job collect
(84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:00,836 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Successfully ran initialization on master in 3 ms.
2021-01-15 16:52:00,836 INFO
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built
1 pipelined regions in 0 ms
2021-01-15 16:52:00,836 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Using job/cluster config to configure application-defined state backend: File
State Backend (checkpoints: 's3://flink/checkpoints', savepoints:
's3://flink/savepoints', asynchronous: TRUE, fileStateThreshold: 20480)
2021-01-15 16:52:00,836 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Using application-defined state backend: File State Backend (checkpoints:
's3://flink/checkpoints', savepoints: 's3://flink/savepoints', asynchronous:
TRUE, fileStateThreshold: 20480)
2021-01-15 16:52:06,865 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint
found during restore.
2021-01-15 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Using failover strategy
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@39b9ad24
for collect (84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:06,866 INFO
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl [] - JobManager runner
for job collect (84c9f12fe943bc7f32ee637666ed3bc1) was granted leadership with
session id 00000000-0000-0000-0000-000000000000 at
akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68.
2021-01-15 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Starting execution of job collect (84c9f12fe943bc7f32ee637666ed3bc1) under job
master id 00000000000000000000000000000000.
2021-01-15 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Starting scheduling with scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2021-01-15 16:52:06,866 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect
(84c9f12fe943bc7f32ee637666ed3bc1) switched from state CREATED to RUNNING.
2021-01-15 16:52:06,866 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
JdbcTableSource(msg_id, hostname, last_updated) ->
SourceConversion(table=[default_catalog.default_database.log_counts, source:
[JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
(d22b3ac56f07e182ba5b74d68fa74fb1) switched from CREATED to SCHEDULED.
2021-01-15 16:52:06,866 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
GroupWindowAggregate(groupBy=[msg_id, hostname],
window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS
cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
(09cee06206ad355b327cb8487773cd39) switched from CREATED to SCHEDULED.
2021-01-15 16:52:06,866 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Cannot serve slot
request, no ResourceManager connected. Adding as pending request
[SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}]
2021-01-15 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Connecting to ResourceManager
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
2021-01-15 16:52:06,867 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Resolved ResourceManager address, beginning registration
2021-01-15 16:52:06,867 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering job manager
[email protected]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
for job 84c9f12fe943bc7f32ee637666ed3bc1.
2021-01-15 16:52:06,867 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registered job manager
[email protected]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
for job 84c9f12fe943bc7f32ee637666ed3bc1.
2021-01-15 16:52:06,867 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
JobManager successfully registered at ResourceManager, leader id:
00000000000000000000000000000000.
2021-01-15 16:52:06,867 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Requesting new
slot [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] and profile
ResourceProfile{UNKNOWN} with allocation id b0559997a428b1d31d9e57d6532e026b
from resource manager.
2021-01-15 16:52:06,868 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Request
slot with profile ResourceProfile{UNKNOWN} for job
84c9f12fe943bc7f32ee637666ed3bc1 with allocation id
b0559997a428b1d31d9e57d6532e026b.
2021-01-15 16:52:06,874 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
JdbcTableSource(msg_id, hostname, last_updated) ->
SourceConversion(table=[default_catalog.default_database.log_counts, source:
[JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
(d22b3ac56f07e182ba5b74d68fa74fb1) switched from SCHEDULED to DEPLOYING.
2021-01-15 16:52:06,882 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source:
JdbcTableSource(msg_id, hostname, last_updated) ->
SourceConversion(table=[default_catalog.default_database.log_counts, source:
[JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (attempt #0)
with attempt id d22b3ac56f07e182ba5b74d68fa74fb1 to 10.42.1.148:6122-9b9553 @
10.42.1.148 (dataPort=40391) with allocation id b0559997a428b1d31d9e57d6532e026b
2021-01-15 16:52:06,883 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
GroupWindowAggregate(groupBy=[msg_id, hostname],
window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS
cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
(09cee06206ad355b327cb8487773cd39) switched from SCHEDULED to DEPLOYING.
2021-01-15 16:52:06,883 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
GroupWindowAggregate(groupBy=[msg_id, hostname],
window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS
cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (attempt #0)
with attempt id 09cee06206ad355b327cb8487773cd39 to 10.42.1.148:6122-9b9553 @
10.42.1.148 (dataPort=40391) with allocation id b0559997a428b1d31d9e57d6532e026b
2021-01-15 16:52:07,038 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
GroupWindowAggregate(groupBy=[msg_id, hostname],
window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS
cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
(09cee06206ad355b327cb8487773cd39) switched from DEPLOYING to RUNNING.
2021-01-15 16:52:07,038 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
JdbcTableSource(msg_id, hostname, last_updated) ->
SourceConversion(table=[default_catalog.default_database.log_counts, source:
[JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
(d22b3ac56f07e182ba5b74d68fa74fb1) switched from DEPLOYING to RUNNING.
2021-01-15 16:52:07,057 INFO
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator
[] - Received sink socket server address: /10.42.1.148:39303
2021-01-15 16:52:07,060 WARN
org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname could
be resolved for the IP address 10.42.1.148, using IP address as host name.
Local input split assignment (such as for HDFS files) may be impacted.
2021-01-15 16:52:07,988 WARN
org.apache.flink.client.deployment.application.DetachedApplicationRunner [] -
Could not execute application:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
[?:1.8.0_275]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_275]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_275]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:719)
~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)
~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192)
~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_275]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_275]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_275]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 13 more
Caused by: java.lang.IllegalArgumentException: Job client must be a
CoordinationRequestGateway. This is a bug.
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.setJobClient(CollectResultFetcher.java:95)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.setJobClient(CollectResultIterator.java:98)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.table.planner.sinks.SelectTableSinkBase$1.setJobClient(SelectTableSinkBase.java:93)
~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:709)
~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)
~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192)
~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_275]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_275]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_275]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
... 13 more
2021-01-15 16:52:07,989 ERROR
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Exception
occurred in REST handler: Could not execute application.
2021-01-15 16:52:08,462 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
JdbcTableSource(msg_id, hostname, last_updated) ->
SourceConversion(table=[default_catalog.default_database.log_counts, source:
[JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
(d22b3ac56f07e182ba5b74d68fa74fb1) switched from RUNNING to FINISHED.
2021-01-15 16:52:08,465 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
GroupWindowAggregate(groupBy=[msg_id, hostname],
window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0 AS
cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
(09cee06206ad355b327cb8487773cd39) switched from RUNNING to FINISHED.
2021-01-15 16:52:08,465 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect
(84c9f12fe943bc7f32ee637666ed3bc1) switched from state RUNNING to FINISHED.
2021-01-15 16:52:08,466 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping
checkpoint coordinator for job 84c9f12fe943bc7f32ee637666ed3bc1.
2021-01-15 16:52:08,466 INFO
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] -
Shutting down
2021-01-15 16:52:08,466 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
84c9f12fe943bc7f32ee637666ed3bc1 reached globally terminal state FINISHED.
2021-01-15 16:52:08,467 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Stopping the JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1).
2021-01-15 16:52:08,467 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending
SlotPool.
2021-01-15 16:52:08,468 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Close ResourceManager connection a76a3c5321498f13d0552421928c6062: Stopping
JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1)..
2021-01-15 16:52:08,468 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping SlotPool.
2021-01-15 16:52:08,468 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Disconnect job manager
[email protected]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource manager.
{code}
> Calling execute() on org.apache.flink.table.api.Table Throws Exception
> ----------------------------------------------------------------------
>
> Key: FLINK-20995
> URL: https://issues.apache.org/jira/browse/FLINK-20995
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.12.0
> Environment: Flink version 1.12.0
> Kubernetes Standalone Cluster (Session Mode)
> uname -a
> Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov
> 8 23:39:32 UTC 2018 x86_64 GNU/Linux
> Reporter: Robert Cullen
> Priority: Blocker
>
> Exception on this line:
> {code:java}
> try (CloseableIterator<Row> iterator = log_counts.execute().collect()) {
> ...
> {code}
> Here's the code snippet: (See Stack Trace below)
> {code:java}
> ...
>
> final EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> final TableEnvironment tEnv = TableEnvironment.create(settings);
> String ddl = "CREATE TABLE log_counts (\n" +
> " msg_id STRING,\n" +
> " hostname STRING,\n" +
> " last_updated TIMESTAMP(3),\n" +
> " WATERMARK FOR last_updated AS last_updated - INTERVAL '5'
> SECOND\n" +
> ") WITH (\n" +
> " 'connector.type' = 'jdbc',\n" +
> " 'connector.url' =
> 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" +
> " 'connector.table' = 'chi_logger_intake',\n" +
> " 'connector.driver' = 'org.postgresql.Driver',\n" +
> " 'connector.username' = 'user',\n" +
> " 'connector.password' = 'password'\n" +
> ")";
> tEnv.executeSql(ddl);
> Table log_counts = tEnv.from("log_counts")
> .filter($("hostname").isNotNull()
> .and($("hostname").isNotEqual("")))
> .window(Tumble
> .over(lit(5).minutes())
> .on($("last_updated")).as("w"))
> .groupBy($("msg_id"), $("hostname"), $("w"))
> .select($("msg_id"),
> $("hostname"),
> $("msg_id").count().as("cnt"));
> try (CloseableIterator<Row> iterator =
> log_counts.execute().collect()) {
> final List<Row> materializedUpdates = new ArrayList<>();
> iterator.forEachRemaining(
> row -> {
> final RowKind kind = row.getKind();
> switch (kind) {
> case INSERT:
> case UPDATE_AFTER:
> row.setKind(RowKind.INSERT); // for full
> equality
> materializedUpdates.add(row);
> break;
> case UPDATE_BEFORE:
> case DELETE:
> row.setKind(RowKind.INSERT); // for full
> equality
> materializedUpdates.remove(row);
> break;
> }
> });
> // show the final output table if the result is bounded,
> // the output should exclude San Antonio because it has a smaller
> population than
> // Houston or Dallas in Texas (TX)
> materializedUpdates.forEach(System.out::println);
> }{code}
>
> Stack Trace:
> {code:java}
> 2021-01-15 16:52:00,628 WARN
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring
> the job submission via query parameters is deprecated. Please migrate to
> submitting a JSON request instead.
> 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] -
> Starting program (detached: true)
> 2021-01-15 16:52:00,678 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted.
> 2021-01-15 16:52:00,678 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor []
> - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
> 2021-01-15 16:52:00,830 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received
> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
> 2021-01-15 16:52:00,830 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job
> 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
> 2021-01-15 16:52:00,831 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService
> [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster
> at akka://flink/user/rpc/jobmanager_68 .
> 2021-01-15 16:52:00,831 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Initializing job collect (84c9f12fe943bc7f32ee637666ed3bc1).
> 2021-01-15 16:52:00,832 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Using restart back off time strategy NoRestartBackoffTimeStrategy for
> collect (84c9f12fe943bc7f32ee637666ed3bc1).
> 2021-01-15 16:52:00,832 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Running initialization on master for job collect
> (84c9f12fe943bc7f32ee637666ed3bc1).
> 2021-01-15 16:52:00,836 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Successfully ran initialization on master in 3 ms.
> 2021-01-15 16:52:00,836 INFO
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
> Built 1 pipelined regions in 0 ms
> 2021-01-15 16:52:00,836 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Using job/cluster config to configure application-defined state backend:
> File State Backend (checkpoints: 's3://flink/checkpoints', savepoints:
> 's3://flink/savepoints', asynchronous: TRUE, fileStateThreshold: 20480)
> 2021-01-15 16:52:00,836 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Using application-defined state backend: File State Backend (checkpoints:
> 's3://flink/checkpoints', savepoints: 's3://flink/savepoints', asynchronous:
> TRUE, fileStateThreshold: 20480)
> 2021-01-15 16:52:06,865 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint
> found during restore.
> 2021-01-15 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Using failover strategy
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@39b9ad24
> for collect (84c9f12fe943bc7f32ee637666ed3bc1).
> 2021-01-15 16:52:06,866 INFO
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl [] - JobManager
> runner for job collect (84c9f12fe943bc7f32ee637666ed3bc1) was granted
> leadership with session id 00000000-0000-0000-0000-000000000000 at
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68.
> 2021-01-15 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Starting execution of job collect (84c9f12fe943bc7f32ee637666ed3bc1) under
> job master id 00000000000000000000000000000000.
> 2021-01-15 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Starting scheduling with scheduling strategy
> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
> 2021-01-15 16:52:06,866 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect
> (84c9f12fe943bc7f32ee637666ed3bc1) switched from state CREATED to RUNNING.
> 2021-01-15 16:52:06,866 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> JdbcTableSource(msg_id, hostname, last_updated) ->
> SourceConversion(table=[default_catalog.default_database.log_counts, source:
> [JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
> last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
> watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
> hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from CREATED to SCHEDULED.
> 2021-01-15 16:52:06,866 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> GroupWindowAggregate(groupBy=[msg_id, hostname],
> window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
> hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0
> AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
> (09cee06206ad355b327cb8487773cd39) switched from CREATED to SCHEDULED.
> 2021-01-15 16:52:06,866 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Cannot serve
> slot request, no ResourceManager connected. Adding as pending request
> [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}]
> 2021-01-15 16:52:06,866 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Connecting to ResourceManager
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
> 2021-01-15 16:52:06,867 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Resolved ResourceManager address, beginning registration
> 2021-01-15 16:52:06,867 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registering job manager
> [email protected]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
> for job 84c9f12fe943bc7f32ee637666ed3bc1.
> 2021-01-15 16:52:06,867 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Registered job manager
> [email protected]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
> for job 84c9f12fe943bc7f32ee637666ed3bc1.
> 2021-01-15 16:52:06,867 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - JobManager successfully registered at ResourceManager, leader id:
> 00000000000000000000000000000000.
> 2021-01-15 16:52:06,867 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Requesting new
> slot [SlotRequestId{b3f1b6b4eaab427305ec15f92a39531e}] and profile
> ResourceProfile{UNKNOWN} with allocation id b0559997a428b1d31d9e57d6532e026b
> from resource manager.
> 2021-01-15 16:52:06,868 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Request slot with profile ResourceProfile{UNKNOWN} for job
> 84c9f12fe943bc7f32ee637666ed3bc1 with allocation id
> b0559997a428b1d31d9e57d6532e026b.
> 2021-01-15 16:52:06,874 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> JdbcTableSource(msg_id, hostname, last_updated) ->
> SourceConversion(table=[default_catalog.default_database.log_counts, source:
> [JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
> last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
> watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
> hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from SCHEDULED to DEPLOYING.
> 2021-01-15 16:52:06,882 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source:
> JdbcTableSource(msg_id, hostname, last_updated) ->
> SourceConversion(table=[default_catalog.default_database.log_counts, source:
> [JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
> last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
> watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
> hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1) (attempt
> #0) with attempt id d22b3ac56f07e182ba5b74d68fa74fb1 to
> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with allocation id
> b0559997a428b1d31d9e57d6532e026b
> 2021-01-15 16:52:06,883 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> GroupWindowAggregate(groupBy=[msg_id, hostname],
> window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
> hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0
> AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
> (09cee06206ad355b327cb8487773cd39) switched from SCHEDULED to DEPLOYING.
> 2021-01-15 16:52:06,883 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
> GroupWindowAggregate(groupBy=[msg_id, hostname],
> window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
> hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0
> AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1) (attempt
> #0) with attempt id 09cee06206ad355b327cb8487773cd39 to
> 10.42.1.148:6122-9b9553 @ 10.42.1.148 (dataPort=40391) with allocation id
> b0559997a428b1d31d9e57d6532e026b
> 2021-01-15 16:52:07,038 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> GroupWindowAggregate(groupBy=[msg_id, hostname],
> window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
> hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0
> AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
> (09cee06206ad355b327cb8487773cd39) switched from DEPLOYING to RUNNING.
> 2021-01-15 16:52:07,038 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> JdbcTableSource(msg_id, hostname, last_updated) ->
> SourceConversion(table=[default_catalog.default_database.log_counts, source:
> [JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
> last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
> watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
> hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from DEPLOYING to RUNNING.
> 2021-01-15 16:52:07,057 INFO
> org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator
> [] - Received sink socket server address: /10.42.1.148:39303
> 2021-01-15 16:52:07,060 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname
> could be resolved for the IP address 10.42.1.148, using IP address as host
> name. Local input split assignment (such as for HDFS files) may be impacted.
> 2021-01-15 16:52:07,988 WARN
> org.apache.flink.client.deployment.application.DetachedApplicationRunner [] -
> Could not execute application:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Failed to execute sql
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> [?:1.8.0_275]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_275]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_275]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [?:1.8.0_275]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_275]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_275]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:719)
> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)
> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
> at
> io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_275]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_275]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_275]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> ... 13 more
> Caused by: java.lang.IllegalArgumentException: Job client must be a
> CoordinationRequestGateway. This is a bug.
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:142)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.setJobClient(CollectResultFetcher.java:95)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.setJobClient(CollectResultIterator.java:98)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.table.planner.sinks.SelectTableSinkBase$1.setJobClient(SelectTableSinkBase.java:93)
> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:709)
> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)
> ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
> at
> io.cmdaa.streaming.java.jdbc.TableConsumer.main(TableConsumer.java:192) ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_275]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_275]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_275]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> ... 13 more
> 2021-01-15 16:52:07,989 ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Exception
> occurred in REST handler: Could not execute application.
> 2021-01-15 16:52:08,462 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> JdbcTableSource(msg_id, hostname, last_updated) ->
> SourceConversion(table=[default_catalog.default_database.log_counts, source:
> [JdbcTableSource(msg_id, hostname, last_updated)]], fields=[msg_id, hostname,
> last_updated]) -> WatermarkAssigner(rowtime=[last_updated],
> watermark=[(last_updated - 5000:INTERVAL SECOND)]) -> Calc(select=[msg_id,
> hostname, last_updated], where=[(hostname <> _UTF-16LE'')]) (1/1)
> (d22b3ac56f07e182ba5b74d68fa74fb1) switched from RUNNING to FINISHED.
> 2021-01-15 16:52:08,465 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> GroupWindowAggregate(groupBy=[msg_id, hostname],
> window=[TumblingGroupWindow('w, last_updated, 300000)], select=[msg_id,
> hostname, COUNT(msg_id) AS EXPR$0]) -> Calc(select=[msg_id, hostname, EXPR$0
> AS cnt]) -> SinkConversionToTuple2 -> Sink: Select table sink (1/1)
> (09cee06206ad355b327cb8487773cd39) switched from RUNNING to FINISHED.
> 2021-01-15 16:52:08,465 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect
> (84c9f12fe943bc7f32ee637666ed3bc1) switched from state RUNNING to FINISHED.
> 2021-01-15 16:52:08,466 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping
> checkpoint coordinator for job 84c9f12fe943bc7f32ee637666ed3bc1.
> 2021-01-15 16:52:08,466 INFO
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] -
> Shutting down
> 2021-01-15 16:52:08,466 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
> 84c9f12fe943bc7f32ee637666ed3bc1 reached globally terminal state FINISHED.
> 2021-01-15 16:52:08,467 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Stopping the JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1).
> 2021-01-15 16:52:08,467 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending
> SlotPool.
> 2021-01-15 16:52:08,468 INFO org.apache.flink.runtime.jobmaster.JobMaster []
> - Close ResourceManager connection a76a3c5321498f13d0552421928c6062: Stopping
> JobMaster for job collect(84c9f12fe943bc7f32ee637666ed3bc1)..
> 2021-01-15 16:52:08,468 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping
> SlotPool.
> 2021-01-15 16:52:08,468 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Disconnect job manager
> [email protected]://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
> for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource manager.
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)