lsyldliu commented on code in PR #24866:
URL: https://github.com/apache/flink/pull/24866#discussion_r1620385497
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -640,10 +750,15 @@ private static byte[]
serializeContinuousHandler(ContinuousRefreshHandler refres
}
}
- private static ResolvedCatalogMaterializedTable
getCatalogMaterializedTable(
+ private ResolvedCatalogMaterializedTable getCatalogMaterializedTable(
OperationExecutor operationExecutor, ObjectIdentifier
tableIdentifier) {
ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
- operationExecutor.getTable(tableIdentifier);
+ operationExecutor
Review Comment:
Why do you need to change this code snippet? Original logical can't satisfy
your needs?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -165,7 +215,73 @@ private static void createMaterializedInContinuousMode(
}
}
- private static ResultFetcher callAlterMaterializedTableSuspend(
+ private void createMaterializedInFullMode(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ CreateMaterializedTableOperation createMaterializedTableOperation)
{
+ if (workflowScheduler == null) {
+ throw new SqlExecutionException(
+ "Scheduler is not configured, can't create materialized
table in full mode.");
+ }
+ // create materialized table first
+ operationExecutor.callExecutableOperation(handle,
createMaterializedTableOperation);
+
+ ObjectIdentifier materializedTableIdentifier =
+ createMaterializedTableOperation.getTableIdentifier();
+ CatalogMaterializedTable catalogMaterializedTable =
+ createMaterializedTableOperation.getCatalogMaterializedTable();
+
+ // convert duration to cron expression
+ String cronExpression =
+
convertFreshnessToCron(catalogMaterializedTable.getDefinitionFreshness());
+ // create full refresh job
+ CreateRefreshWorkflow createRefreshWorkflow =
+ new CreatePeriodicRefreshWorkflow(
+ materializedTableIdentifier,
+ catalogMaterializedTable.getDefinitionQuery(),
+ cronExpression,
+ catalogMaterializedTable.getOptions(),
+ Collections.emptyMap(),
+ restEndpointUrl);
+
+ try {
+ RefreshHandler refreshHandler =
+
workflowScheduler.createRefreshWorkflow(createRefreshWorkflow);
+ RefreshHandlerSerializer refreshHandlerSerializer =
+ workflowScheduler.getRefreshHandlerSerializer();
+ byte[] serializedRefreshHandler =
refreshHandlerSerializer.serialize(refreshHandler);
+
+ CatalogMaterializedTable updatedMaterializedTable =
Review Comment:
We can extract a util method as follows:
```
private void updateRefreshHandler(
OperationExecutor operationExecutor,
OperationHandle handle,
ObjectIdentifier materializedTableIdentifier,
CatalogMaterializedTable catalogMaterializedTable,
String refreshHandlerSummary,
byte[] refreshHandlerBytes) {
CatalogMaterializedTable updatedMaterializedTable =
catalogMaterializedTable.copy(
CatalogMaterializedTable.RefreshStatus.ACTIVATED,
refreshHandlerSummary,
refreshHandlerBytes);
List<TableChange> tableChanges = new ArrayList<>();
tableChanges.add(
TableChange.modifyRefreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED));
tableChanges.add(
TableChange.modifyRefreshHandler(refreshHandlerSummary,
refreshHandlerBytes));
AlterMaterializedTableChangeOperation
alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
materializedTableIdentifier, tableChanges,
updatedMaterializedTable);
// update RefreshHandler to Catalog
operationExecutor.callExecutableOperation(handle,
alterMaterializedTableChangeOperation);
}
```
This method and `executeContinuousRefreshJob` can both reuse it.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -165,7 +215,73 @@ private static void createMaterializedInContinuousMode(
}
}
- private static ResultFetcher callAlterMaterializedTableSuspend(
+ private void createMaterializedInFullMode(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ CreateMaterializedTableOperation createMaterializedTableOperation)
{
+ if (workflowScheduler == null) {
+ throw new SqlExecutionException(
+ "Scheduler is not configured, can't create materialized
table in full mode.");
+ }
+ // create materialized table first
+ operationExecutor.callExecutableOperation(handle,
createMaterializedTableOperation);
+
+ ObjectIdentifier materializedTableIdentifier =
+ createMaterializedTableOperation.getTableIdentifier();
+ CatalogMaterializedTable catalogMaterializedTable =
+ createMaterializedTableOperation.getCatalogMaterializedTable();
+
+ // convert duration to cron expression
+ String cronExpression =
+
convertFreshnessToCron(catalogMaterializedTable.getDefinitionFreshness());
+ // create full refresh job
+ CreateRefreshWorkflow createRefreshWorkflow =
+ new CreatePeriodicRefreshWorkflow(
+ materializedTableIdentifier,
+ catalogMaterializedTable.getDefinitionQuery(),
+ cronExpression,
+ catalogMaterializedTable.getOptions(),
+ Collections.emptyMap(),
+ restEndpointUrl);
+
+ try {
+ RefreshHandler refreshHandler =
+
workflowScheduler.createRefreshWorkflow(createRefreshWorkflow);
+ RefreshHandlerSerializer refreshHandlerSerializer =
+ workflowScheduler.getRefreshHandlerSerializer();
+ byte[] serializedRefreshHandler =
refreshHandlerSerializer.serialize(refreshHandler);
+
+ CatalogMaterializedTable updatedMaterializedTable =
+ catalogMaterializedTable.copy(
+ CatalogMaterializedTable.RefreshStatus.ACTIVATED,
+ refreshHandler.asSummaryString(),
+ serializedRefreshHandler);
+ List<TableChange> tableChanges = new ArrayList<>();
+ tableChanges.add(
+ TableChange.modifyRefreshStatus(
+ CatalogMaterializedTable.RefreshStatus.ACTIVATED));
+ tableChanges.add(
+ TableChange.modifyRefreshHandler(
+ refreshHandler.asSummaryString(),
serializedRefreshHandler));
+ AlterMaterializedTableChangeOperation
alterMaterializedTableChangeOperation =
+ new AlterMaterializedTableChangeOperation(
+ materializedTableIdentifier, tableChanges,
updatedMaterializedTable);
+ operationExecutor.callExecutableOperation(
+ handle, alterMaterializedTableChangeOperation);
+ } catch (Exception e) {
+ // drop materialized table while submit flink streaming job occur
exception. Thus, weak
+ // atomicity is guaranteed
+ LOG.warn(
+ "Create refresh workflow occur exception, drop
materialized table {}.",
+ materializedTableIdentifier,
+ e);
+ operationExecutor.callExecutableOperation(
+ handle, new
DropMaterializedTableOperation(materializedTableIdentifier, true));
+ throw new SqlExecutionException("Failed to create refresh
workflow.", e);
Review Comment:
```suggestion
throw new SqlExecutionException(
String.format(
"Failed to create refresh workflow for
materialized table %s.",
materializedTableIdentifier),
e);
```
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -214,16 +248,230 @@ private void checkJobExists(JobKey jobKey, String
errorMsg)
}
}
+ @VisibleForTesting
+ public Scheduler getQuartzScheduler() {
+ return quartzScheduler;
+ }
+
/** The {@link Job} implementation for embedded quartz scheduler. */
- private class EmbeddedSchedulerJob implements Job {
+ public static class EmbeddedSchedulerJob implements Job {
+
+ public EmbeddedSchedulerJob() {}
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
- String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
- WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
- // TODO: implement the refresh operation for materialized table,
see FLINK-35348
- LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ SessionHandle sessionHandle = null;
+ OperationHandle operationHandle = null;
+ SqlGatewayRestClient gatewayRestClient = null;
+ try {
+ JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
+ WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
+ LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ Configuration configuration = new Configuration();
+ RestClient restClient =
+ RestClient.forUrl(
+ configuration,
+ Executors.newFixedThreadPool(1),
+ new URL(workflowInfo.getRestEndpointUrl()));
+
+ String schedulerTime =
dateToString(context.getScheduledFireTime());
+
+ gatewayRestClient = new
SqlGatewayRestClient(workflowInfo.getRestEndpointUrl());
+ sessionHandle =
+ gatewayRestClient.openSession(
+ String.format(
+ "%s-quartz-refresh-session-%s",
+
workflowInfo.getMaterializedTableIdentifier(),
+ schedulerTime));
+ operationHandle =
+ gatewayRestClient.refreshMaterializedTable(
+ sessionHandle,
+ workflowInfo.getMaterializedTableIdentifier(),
+ true,
+ schedulerTime,
+ workflowInfo.getDynamicOptions(),
+ Collections.emptyMap(),
+ workflowInfo.getExecutionConfig());
+
+ List<RowData> results =
+
gatewayRestClient.fetchOperationAllResults(sessionHandle, operationHandle);
+
+ String jobId = results.get(0).getString(0).toString();
+ LOG.info(
+ "Successfully executed refresh operation for
materialized table: {} with job id: {}.",
+ workflowInfo.getMaterializedTableIdentifier(),
+ jobId);
+
+ context.setResult(
+ "Successfully executed refresh operation for
materialized table: "
+ + workflowInfo.getMaterializedTableIdentifier()
+ + " with job id: "
+ + jobId);
+ // TODO wait for the job to finish
+ } catch (Exception e) {
+ LOG.error("Failed to execute refresh operation for workflow.",
e);
+ throw new JobExecutionException(e.getMessage(), e);
+ } finally {
+ try {
+ if (gatewayRestClient != null) {
+ if (operationHandle != null) {
+ gatewayRestClient.closeOperation(sessionHandle,
operationHandle);
+ }
+ if (sessionHandle != null) {
+ gatewayRestClient.closeSession(sessionHandle);
+ }
+ gatewayRestClient.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to close session.", e);
+ }
+ }
+ }
+
+ /** A simple rest client for gateway rest endpoint. */
+ private static class SqlGatewayRestClient implements AutoCloseable {
+
+ private final int port;
+ private final String address;
+
+ private final RestClient restClient;
+
+ public SqlGatewayRestClient(String endpointUrl) throws Exception {
+ URL url = new URL(endpointUrl);
+ this.address = url.getHost();
+ this.port = url.getPort();
+ this.restClient =
+ RestClient.forUrl(
+ new Configuration(),
Executors.newFixedThreadPool(1), url);
+ }
+
+ public SessionHandle openSession(String sessionName) throws
Exception {
+ Map<String, String> properties = Collections.emptyMap();
+
+ OpenSessionRequestBody requestBody =
+ new OpenSessionRequestBody(sessionName, properties);
+ OpenSessionHeaders headers = OpenSessionHeaders.getInstance();
+
+ OpenSessionResponseBody responseBody =
+ restClient
+ .sendRequest(
+ address,
+ port,
+ headers,
+ EmptyMessageParameters.getInstance(),
+ requestBody)
+ .get();
+
+ return new
SessionHandle(UUID.fromString(responseBody.getSessionHandle()));
+ }
+
+ public void closeSession(SessionHandle sessionHandle) throws
Exception {
+ // Close session
+ CloseSessionHeaders closeSessionHeaders =
CloseSessionHeaders.getInstance();
+ SessionMessageParameters sessionMessageParameters =
+ new SessionMessageParameters(sessionHandle);
+ restClient
+ .sendRequest(
+ address,
+ port,
+ closeSessionHeaders,
+ sessionMessageParameters,
+ EmptyRequestBody.getInstance())
+ .get();
+ }
+
+ public void closeOperation(SessionHandle sessionHandle,
OperationHandle operationHandle)
+ throws Exception {
+ // Close operation
+ CloseOperationHeaders closeOperationHeaders =
CloseOperationHeaders.getInstance();
+ OperationMessageParameters operationMessageParameters =
+ new OperationMessageParameters(sessionHandle,
operationHandle);
+ restClient
+ .sendRequest(
+ address,
+ port,
+ closeOperationHeaders,
+ operationMessageParameters,
+ EmptyRequestBody.getInstance())
+ .get();
+ }
+
+ public OperationHandle refreshMaterializedTable(
+ SessionHandle sessionHandle,
+ String materializedTableIdentifier,
+ boolean ignoreFailures,
Review Comment:
Why need this boolean flag, it always true here.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -214,16 +248,230 @@ private void checkJobExists(JobKey jobKey, String
errorMsg)
}
}
+ @VisibleForTesting
+ public Scheduler getQuartzScheduler() {
+ return quartzScheduler;
+ }
+
/** The {@link Job} implementation for embedded quartz scheduler. */
- private class EmbeddedSchedulerJob implements Job {
+ public static class EmbeddedSchedulerJob implements Job {
+
+ public EmbeddedSchedulerJob() {}
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
- String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
- WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
- // TODO: implement the refresh operation for materialized table,
see FLINK-35348
- LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ SessionHandle sessionHandle = null;
+ OperationHandle operationHandle = null;
+ SqlGatewayRestClient gatewayRestClient = null;
+ try {
+ JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
+ WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
+ LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ Configuration configuration = new Configuration();
+ RestClient restClient =
+ RestClient.forUrl(
+ configuration,
+ Executors.newFixedThreadPool(1),
+ new URL(workflowInfo.getRestEndpointUrl()));
+
+ String schedulerTime =
dateToString(context.getScheduledFireTime());
+
+ gatewayRestClient = new
SqlGatewayRestClient(workflowInfo.getRestEndpointUrl());
+ sessionHandle =
+ gatewayRestClient.openSession(
+ String.format(
+ "%s-quartz-refresh-session-%s",
+
workflowInfo.getMaterializedTableIdentifier(),
+ schedulerTime));
+ operationHandle =
+ gatewayRestClient.refreshMaterializedTable(
+ sessionHandle,
+ workflowInfo.getMaterializedTableIdentifier(),
+ true,
+ schedulerTime,
+ workflowInfo.getDynamicOptions(),
+ Collections.emptyMap(),
+ workflowInfo.getExecutionConfig());
+
+ List<RowData> results =
+
gatewayRestClient.fetchOperationAllResults(sessionHandle, operationHandle);
+
+ String jobId = results.get(0).getString(0).toString();
+ LOG.info(
+ "Successfully executed refresh operation for
materialized table: {} with job id: {}.",
+ workflowInfo.getMaterializedTableIdentifier(),
+ jobId);
+
+ context.setResult(
+ "Successfully executed refresh operation for
materialized table: "
+ + workflowInfo.getMaterializedTableIdentifier()
+ + " with job id: "
+ + jobId);
+ // TODO wait for the job to finish
+ } catch (Exception e) {
+ LOG.error("Failed to execute refresh operation for workflow.",
e);
+ throw new JobExecutionException(e.getMessage(), e);
+ } finally {
+ try {
+ if (gatewayRestClient != null) {
+ if (operationHandle != null) {
+ gatewayRestClient.closeOperation(sessionHandle,
operationHandle);
+ }
+ if (sessionHandle != null) {
+ gatewayRestClient.closeSession(sessionHandle);
+ }
+ gatewayRestClient.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to close session.", e);
+ }
+ }
+ }
+
+ /** A simple rest client for gateway rest endpoint. */
+ private static class SqlGatewayRestClient implements AutoCloseable {
+
+ private final int port;
+ private final String address;
+
+ private final RestClient restClient;
+
+ public SqlGatewayRestClient(String endpointUrl) throws Exception {
+ URL url = new URL(endpointUrl);
+ this.address = url.getHost();
+ this.port = url.getPort();
+ this.restClient =
+ RestClient.forUrl(
+ new Configuration(),
Executors.newFixedThreadPool(1), url);
+ }
+
+ public SessionHandle openSession(String sessionName) throws
Exception {
+ Map<String, String> properties = Collections.emptyMap();
+
+ OpenSessionRequestBody requestBody =
+ new OpenSessionRequestBody(sessionName, properties);
+ OpenSessionHeaders headers = OpenSessionHeaders.getInstance();
+
+ OpenSessionResponseBody responseBody =
+ restClient
+ .sendRequest(
+ address,
+ port,
+ headers,
+ EmptyMessageParameters.getInstance(),
+ requestBody)
+ .get();
+
+ return new
SessionHandle(UUID.fromString(responseBody.getSessionHandle()));
+ }
+
+ public void closeSession(SessionHandle sessionHandle) throws
Exception {
+ // Close session
+ CloseSessionHeaders closeSessionHeaders =
CloseSessionHeaders.getInstance();
+ SessionMessageParameters sessionMessageParameters =
+ new SessionMessageParameters(sessionHandle);
+ restClient
+ .sendRequest(
+ address,
+ port,
+ closeSessionHeaders,
+ sessionMessageParameters,
+ EmptyRequestBody.getInstance())
+ .get();
+ }
+
+ public void closeOperation(SessionHandle sessionHandle,
OperationHandle operationHandle)
+ throws Exception {
+ // Close operation
+ CloseOperationHeaders closeOperationHeaders =
CloseOperationHeaders.getInstance();
+ OperationMessageParameters operationMessageParameters =
+ new OperationMessageParameters(sessionHandle,
operationHandle);
+ restClient
+ .sendRequest(
+ address,
+ port,
+ closeOperationHeaders,
+ operationMessageParameters,
+ EmptyRequestBody.getInstance())
+ .get();
+ }
+
+ public OperationHandle refreshMaterializedTable(
+ SessionHandle sessionHandle,
+ String materializedTableIdentifier,
+ boolean ignoreFailures,
+ String schedulerTime,
+ Map<String, String> dynamicOptions,
+ Map<String, String> staticPartitions,
+ Map<String, String> executionConfig)
+ throws Exception {
+
+ RefreshMaterializedTableRequestBody requestBody =
+ new RefreshMaterializedTableRequestBody(
+ true,
+ schedulerTime,
+ dynamicOptions,
+ staticPartitions,
+ executionConfig);
+ RefreshMaterializedTableHeaders headers =
+ RefreshMaterializedTableHeaders.getInstance();
+ RefreshMaterializedTableParameters parameters =
+ new RefreshMaterializedTableParameters(
+ sessionHandle, materializedTableIdentifier);
+
+ RefreshMaterializedTableResponseBody responseBody =
+ restClient
+ .sendRequest(address, port, headers,
parameters, requestBody)
+ .get();
+
+ return new
OperationHandle(UUID.fromString(responseBody.getOperationHandle()));
+ }
+
+ public List<RowData> fetchOperationAllResults(
+ SessionHandle sessionHandle, OperationHandle
operationHandle) throws Exception {
+ Long token = 0L;
+ List<RowData> results = new ArrayList<>();
+ while (token != null) {
+ FetchResultsResponseBody responseBody =
+ fetchOperationResults(sessionHandle,
operationHandle, token);
+ if (responseBody instanceof NotReadyFetchResultResponse) {
+ continue;
+ }
+ responseBody.getNextResultUri();
+ results.addAll(responseBody.getResults().getData());
+ token =
SqlGatewayRestEndpointUtils.parseToken(responseBody.getNextResultUri());
+ }
+ return results;
+ }
+
+ public FetchResultsResponseBody fetchOperationResults(
+ SessionHandle sessionHandle, OperationHandle
operationHandle, Long token)
+ throws Exception {
+ FetchResultsMessageParameters fetchResultsMessageParameters =
+ new FetchResultsMessageParameters(
+ sessionHandle, operationHandle, token,
RowFormat.PLAIN_TEXT);
Review Comment:
Why it is not `RowFormat.JSON`?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -214,16 +248,230 @@ private void checkJobExists(JobKey jobKey, String
errorMsg)
}
}
+ @VisibleForTesting
+ public Scheduler getQuartzScheduler() {
+ return quartzScheduler;
+ }
+
/** The {@link Job} implementation for embedded quartz scheduler. */
- private class EmbeddedSchedulerJob implements Job {
+ public static class EmbeddedSchedulerJob implements Job {
+
+ public EmbeddedSchedulerJob() {}
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
- String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
- WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
- // TODO: implement the refresh operation for materialized table,
see FLINK-35348
- LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ SessionHandle sessionHandle = null;
+ OperationHandle operationHandle = null;
+ SqlGatewayRestClient gatewayRestClient = null;
+ try {
+ JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
+ WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
+ LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ Configuration configuration = new Configuration();
+ RestClient restClient =
+ RestClient.forUrl(
+ configuration,
+ Executors.newFixedThreadPool(1),
+ new URL(workflowInfo.getRestEndpointUrl()));
+
+ String schedulerTime =
dateToString(context.getScheduledFireTime());
+
+ gatewayRestClient = new
SqlGatewayRestClient(workflowInfo.getRestEndpointUrl());
+ sessionHandle =
+ gatewayRestClient.openSession(
+ String.format(
+ "%s-quartz-refresh-session-%s",
+
workflowInfo.getMaterializedTableIdentifier(),
+ schedulerTime));
+ operationHandle =
+ gatewayRestClient.refreshMaterializedTable(
+ sessionHandle,
+ workflowInfo.getMaterializedTableIdentifier(),
+ true,
+ schedulerTime,
+ workflowInfo.getDynamicOptions(),
+ Collections.emptyMap(),
+ workflowInfo.getExecutionConfig());
+
+ List<RowData> results =
+
gatewayRestClient.fetchOperationAllResults(sessionHandle, operationHandle);
+
+ String jobId = results.get(0).getString(0).toString();
+ LOG.info(
+ "Successfully executed refresh operation for
materialized table: {} with job id: {}.",
Review Comment:
`Successfully execute refresh operation for materialized table: {} with job
id: {}.` ?
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -158,36 +164,107 @@ void testCreateMaterializedTableInContinuousMode()
throws Exception {
}
@Test
- void testCreateMaterializedTableInFullMode() {
+ void testCreateMaterializedTableInFullMode() throws Exception {
+ String dataId =
TestValuesTableFactory.registerData(Collections.emptyList());
+ String sourceDdl =
+ String.format(
+ "CREATE TABLE IF NOT EXISTS my_source (\n"
+ + " order_id BIGINT,\n"
+ + " user_id BIGINT,\n"
+ + " shop_id BIGINT,\n"
+ + " order_created_at STRING\n"
+ + ")\n"
+ + "WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true',\n"
+ + " 'data-id' = '%s'\n"
+ + ")",
+ dataId);
+
+ OperationHandle sourceHandle =
+ service.executeStatement(sessionHandle, sourceDdl, -1, new
Configuration());
+ awaitOperationTermination(service, sessionHandle, sourceHandle);
+
String materializedTableDDL =
"CREATE MATERIALIZED TABLE users_shops"
+ " PARTITIONED BY (ds)\n"
+ " WITH(\n"
+ + " 'partition.fields.ds.date-formatter' =
'yyyy-MM-dd',\n"
+ " 'format' = 'debezium-json'\n"
+ " )\n"
- + " FRESHNESS = INTERVAL '1' DAY\n"
+ + " FRESHNESS = INTERVAL '1' MINUTE\n"
+ " AS SELECT \n"
+ " user_id,\n"
+ " shop_id,\n"
+ " ds,\n"
- + " SUM (payment_amount_cents) AS
payed_buy_fee_sum,\n"
- + " SUM (1) AS pv\n"
+ + " COUNT(order_id) AS order_cnt\n"
+ " FROM (\n"
- + " SELECT user_id, shop_id,
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM
datagenSource"
+ + " SELECT user_id, shop_id, order_created_at AS
ds, order_id FROM my_source"
+ " ) AS tmp\n"
+ " GROUP BY (user_id, shop_id, ds)";
+ Configuration configuration = new Configuration();
+ configuration.set(
+
MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD,
+ Duration.ofSeconds(30));
OperationHandle materializedTableHandle =
- service.executeStatement(
- sessionHandle, materializedTableDDL, -1, new
Configuration());
+ service.executeStatement(sessionHandle, materializedTableDDL,
-1, configuration);
+ awaitOperationTermination(service, sessionHandle,
materializedTableHandle);
- assertThatThrownBy(
- () ->
- awaitOperationTermination(
- service, sessionHandle,
materializedTableHandle))
- .rootCause()
- .isInstanceOf(SqlExecutionException.class)
- .hasMessage(
- "Only support create materialized table in continuous
refresh mode currently.");
+ // verify materialized table is created
+ ResolvedCatalogMaterializedTable actualMaterializedTable =
+ (ResolvedCatalogMaterializedTable)
+ service.getTable(
+ sessionHandle,
+ ObjectIdentifier.of(
+ fileSystemCatalogName,
+ TEST_DEFAULT_DATABASE,
+ "users_shops"));
+
+ // verify refresh mode
+ assertThat(actualMaterializedTable.getRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL);
+
+ // verify refresh handler
+ byte[] serializedHandler =
actualMaterializedTable.getSerializedRefreshHandler();
+ EmbeddedRefreshHandler embeddedRefreshHandler =
+ EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+ serializedHandler, getClass().getClassLoader());
+ assertThat(embeddedRefreshHandler.getWorkflowName())
+ .isEqualTo(
+ "quartz_job_"
+ + ObjectIdentifier.of(
+ fileSystemCatalogName,
+ TEST_DEFAULT_DATABASE,
+ "users_shops")
+ .asSerializableString());
+
+ EmbeddedQuartzScheduler embeddedWorkflowScheduler =
+ SQL_GATEWAY_REST_ENDPOINT_EXTENSION
+ .getSqlGatewayRestEndpoint()
+ .getQuartzScheduler();
+ JobKey jobKey =
+ new JobKey(
+ embeddedRefreshHandler.getWorkflowName(),
+ embeddedRefreshHandler.getWorkflowGroup());
+
+ // wait scheduler to start
+ CommonTestUtils.waitUtil(
+ () -> {
+ try {
+ return
embeddedWorkflowScheduler.getQuartzScheduler().isStarted();
Review Comment:
This wait goal is what? The scheduler is started after SqlGateway starts.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -165,7 +215,73 @@ private static void createMaterializedInContinuousMode(
}
}
- private static ResultFetcher callAlterMaterializedTableSuspend(
+ private void createMaterializedInFullMode(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ CreateMaterializedTableOperation createMaterializedTableOperation)
{
+ if (workflowScheduler == null) {
+ throw new SqlExecutionException(
+ "Scheduler is not configured, can't create materialized
table in full mode.");
Review Comment:
The workflow scheduler must be configured when creating materialized table
in full refresh mode.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -165,7 +215,73 @@ private static void createMaterializedInContinuousMode(
}
}
- private static ResultFetcher callAlterMaterializedTableSuspend(
+ private void createMaterializedInFullMode(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ CreateMaterializedTableOperation createMaterializedTableOperation)
{
+ if (workflowScheduler == null) {
+ throw new SqlExecutionException(
+ "Scheduler is not configured, can't create materialized
table in full mode.");
+ }
+ // create materialized table first
+ operationExecutor.callExecutableOperation(handle,
createMaterializedTableOperation);
+
+ ObjectIdentifier materializedTableIdentifier =
+ createMaterializedTableOperation.getTableIdentifier();
+ CatalogMaterializedTable catalogMaterializedTable =
+ createMaterializedTableOperation.getCatalogMaterializedTable();
+
+ // convert duration to cron expression
+ String cronExpression =
+
convertFreshnessToCron(catalogMaterializedTable.getDefinitionFreshness());
+ // create full refresh job
+ CreateRefreshWorkflow createRefreshWorkflow =
+ new CreatePeriodicRefreshWorkflow(
+ materializedTableIdentifier,
+ catalogMaterializedTable.getDefinitionQuery(),
+ cronExpression,
+ catalogMaterializedTable.getOptions(),
+ Collections.emptyMap(),
+ restEndpointUrl);
+
+ try {
+ RefreshHandler refreshHandler =
+
workflowScheduler.createRefreshWorkflow(createRefreshWorkflow);
+ RefreshHandlerSerializer refreshHandlerSerializer =
+ workflowScheduler.getRefreshHandlerSerializer();
+ byte[] serializedRefreshHandler =
refreshHandlerSerializer.serialize(refreshHandler);
+
+ CatalogMaterializedTable updatedMaterializedTable =
+ catalogMaterializedTable.copy(
+ CatalogMaterializedTable.RefreshStatus.ACTIVATED,
+ refreshHandler.asSummaryString(),
+ serializedRefreshHandler);
+ List<TableChange> tableChanges = new ArrayList<>();
+ tableChanges.add(
+ TableChange.modifyRefreshStatus(
+ CatalogMaterializedTable.RefreshStatus.ACTIVATED));
+ tableChanges.add(
+ TableChange.modifyRefreshHandler(
+ refreshHandler.asSummaryString(),
serializedRefreshHandler));
+ AlterMaterializedTableChangeOperation
alterMaterializedTableChangeOperation =
+ new AlterMaterializedTableChangeOperation(
+ materializedTableIdentifier, tableChanges,
updatedMaterializedTable);
+ operationExecutor.callExecutableOperation(
+ handle, alterMaterializedTableChangeOperation);
+ } catch (Exception e) {
+ // drop materialized table while submit flink streaming job occur
exception. Thus, weak
Review Comment:
// drop materialized table while create refresh workflow occur exception.
Thus, weak
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -165,7 +215,73 @@ private static void createMaterializedInContinuousMode(
}
}
- private static ResultFetcher callAlterMaterializedTableSuspend(
+ private void createMaterializedInFullMode(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ CreateMaterializedTableOperation createMaterializedTableOperation)
{
+ if (workflowScheduler == null) {
+ throw new SqlExecutionException(
+ "Scheduler is not configured, can't create materialized
table in full mode.");
+ }
+ // create materialized table first
+ operationExecutor.callExecutableOperation(handle,
createMaterializedTableOperation);
+
+ ObjectIdentifier materializedTableIdentifier =
+ createMaterializedTableOperation.getTableIdentifier();
+ CatalogMaterializedTable catalogMaterializedTable =
+ createMaterializedTableOperation.getCatalogMaterializedTable();
+
+ // convert duration to cron expression
+ String cronExpression =
+
convertFreshnessToCron(catalogMaterializedTable.getDefinitionFreshness());
+ // create full refresh job
+ CreateRefreshWorkflow createRefreshWorkflow =
+ new CreatePeriodicRefreshWorkflow(
Review Comment:
The correct code should be:
```
CreateRefreshWorkflow createRefreshWorkflow =
new CreatePeriodicRefreshWorkflow(
materializedTableIdentifier,
catalogMaterializedTable.getDefinitionQuery(),
cronExpression,
operationExecutor.getSessionContext().getSessionConf().toMap(),
restEndpointUrl);
```
1. The `dynamicOptions` is designed for `ALTER MATERIALIZED TABLE xxx RESUME
WITH('key'='val')`, only the `'key'='val'` represent the dynamic options, so
it should be the member variable of `ResumeRefreshWorkflow`,
`CreatePeriodicRefreshWorkflow` doesn't need this variable, so we should remove
it, this is due to my oversight, sorry.
2. The exuctionConfig should be the `sessionConf`, these config options are
user set, so it should work when executing batch refresh job.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -214,16 +248,230 @@ private void checkJobExists(JobKey jobKey, String
errorMsg)
}
}
+ @VisibleForTesting
+ public Scheduler getQuartzScheduler() {
+ return quartzScheduler;
+ }
+
/** The {@link Job} implementation for embedded quartz scheduler. */
- private class EmbeddedSchedulerJob implements Job {
+ public static class EmbeddedSchedulerJob implements Job {
+
+ public EmbeddedSchedulerJob() {}
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
- String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
- WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
- // TODO: implement the refresh operation for materialized table,
see FLINK-35348
- LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ SessionHandle sessionHandle = null;
+ OperationHandle operationHandle = null;
+ SqlGatewayRestClient gatewayRestClient = null;
+ try {
+ JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
+ WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
+ LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ Configuration configuration = new Configuration();
+ RestClient restClient =
Review Comment:
This restClient object is no used.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -214,16 +248,230 @@ private void checkJobExists(JobKey jobKey, String
errorMsg)
}
}
+ @VisibleForTesting
+ public Scheduler getQuartzScheduler() {
+ return quartzScheduler;
+ }
+
/** The {@link Job} implementation for embedded quartz scheduler. */
- private class EmbeddedSchedulerJob implements Job {
+ public static class EmbeddedSchedulerJob implements Job {
+
+ public EmbeddedSchedulerJob() {}
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
- String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
- WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
- // TODO: implement the refresh operation for materialized table,
see FLINK-35348
- LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ SessionHandle sessionHandle = null;
+ OperationHandle operationHandle = null;
+ SqlGatewayRestClient gatewayRestClient = null;
+ try {
+ JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
+ WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
+ LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ Configuration configuration = new Configuration();
+ RestClient restClient =
+ RestClient.forUrl(
+ configuration,
+ Executors.newFixedThreadPool(1),
+ new URL(workflowInfo.getRestEndpointUrl()));
+
+ String schedulerTime =
dateToString(context.getScheduledFireTime());
+
+ gatewayRestClient = new
SqlGatewayRestClient(workflowInfo.getRestEndpointUrl());
+ sessionHandle =
+ gatewayRestClient.openSession(
+ String.format(
+ "%s-quartz-refresh-session-%s",
+
workflowInfo.getMaterializedTableIdentifier(),
+ schedulerTime));
+ operationHandle =
+ gatewayRestClient.refreshMaterializedTable(
+ sessionHandle,
+ workflowInfo.getMaterializedTableIdentifier(),
+ true,
+ schedulerTime,
+ workflowInfo.getDynamicOptions(),
+ Collections.emptyMap(),
+ workflowInfo.getExecutionConfig());
+
+ List<RowData> results =
+
gatewayRestClient.fetchOperationAllResults(sessionHandle, operationHandle);
+
+ String jobId = results.get(0).getString(0).toString();
+ LOG.info(
+ "Successfully executed refresh operation for
materialized table: {} with job id: {}.",
+ workflowInfo.getMaterializedTableIdentifier(),
+ jobId);
+
+ context.setResult(
+ "Successfully executed refresh operation for
materialized table: "
+ + workflowInfo.getMaterializedTableIdentifier()
+ + " with job id: "
+ + jobId);
+ // TODO wait for the job to finish
+ } catch (Exception e) {
+ LOG.error("Failed to execute refresh operation for workflow.",
e);
+ throw new JobExecutionException(e.getMessage(), e);
+ } finally {
+ try {
+ if (gatewayRestClient != null) {
+ if (operationHandle != null) {
+ gatewayRestClient.closeOperation(sessionHandle,
operationHandle);
+ }
+ if (sessionHandle != null) {
+ gatewayRestClient.closeSession(sessionHandle);
+ }
+ gatewayRestClient.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to close session.", e);
+ }
+ }
+ }
+
+ /** A simple rest client for gateway rest endpoint. */
+ private static class SqlGatewayRestClient implements AutoCloseable {
+
+ private final int port;
+ private final String address;
+
+ private final RestClient restClient;
+
+ public SqlGatewayRestClient(String endpointUrl) throws Exception {
+ URL url = new URL(endpointUrl);
+ this.address = url.getHost();
+ this.port = url.getPort();
+ this.restClient =
+ RestClient.forUrl(
+ new Configuration(),
Executors.newFixedThreadPool(1), url);
+ }
+
+ public SessionHandle openSession(String sessionName) throws
Exception {
+ Map<String, String> properties = Collections.emptyMap();
+
+ OpenSessionRequestBody requestBody =
+ new OpenSessionRequestBody(sessionName, properties);
+ OpenSessionHeaders headers = OpenSessionHeaders.getInstance();
+
+ OpenSessionResponseBody responseBody =
+ restClient
+ .sendRequest(
+ address,
+ port,
+ headers,
+ EmptyMessageParameters.getInstance(),
+ requestBody)
+ .get();
+
+ return new
SessionHandle(UUID.fromString(responseBody.getSessionHandle()));
+ }
+
+ public void closeSession(SessionHandle sessionHandle) throws
Exception {
+ // Close session
+ CloseSessionHeaders closeSessionHeaders =
CloseSessionHeaders.getInstance();
+ SessionMessageParameters sessionMessageParameters =
+ new SessionMessageParameters(sessionHandle);
+ restClient
+ .sendRequest(
+ address,
+ port,
+ closeSessionHeaders,
+ sessionMessageParameters,
+ EmptyRequestBody.getInstance())
+ .get();
+ }
+
+ public void closeOperation(SessionHandle sessionHandle,
OperationHandle operationHandle)
+ throws Exception {
+ // Close operation
+ CloseOperationHeaders closeOperationHeaders =
CloseOperationHeaders.getInstance();
+ OperationMessageParameters operationMessageParameters =
+ new OperationMessageParameters(sessionHandle,
operationHandle);
+ restClient
+ .sendRequest(
+ address,
+ port,
+ closeOperationHeaders,
+ operationMessageParameters,
+ EmptyRequestBody.getInstance())
+ .get();
+ }
+
+ public OperationHandle refreshMaterializedTable(
+ SessionHandle sessionHandle,
+ String materializedTableIdentifier,
+ boolean ignoreFailures,
+ String schedulerTime,
+ Map<String, String> dynamicOptions,
+ Map<String, String> staticPartitions,
+ Map<String, String> executionConfig)
+ throws Exception {
+
+ RefreshMaterializedTableRequestBody requestBody =
+ new RefreshMaterializedTableRequestBody(
+ true,
+ schedulerTime,
+ dynamicOptions,
+ staticPartitions,
+ executionConfig);
+ RefreshMaterializedTableHeaders headers =
+ RefreshMaterializedTableHeaders.getInstance();
+ RefreshMaterializedTableParameters parameters =
+ new RefreshMaterializedTableParameters(
+ sessionHandle, materializedTableIdentifier);
+
+ RefreshMaterializedTableResponseBody responseBody =
+ restClient
+ .sendRequest(address, port, headers,
parameters, requestBody)
+ .get();
+
+ return new
OperationHandle(UUID.fromString(responseBody.getOperationHandle()));
+ }
+
+ public List<RowData> fetchOperationAllResults(
+ SessionHandle sessionHandle, OperationHandle
operationHandle) throws Exception {
+ Long token = 0L;
+ List<RowData> results = new ArrayList<>();
+ while (token != null) {
+ FetchResultsResponseBody responseBody =
+ fetchOperationResults(sessionHandle,
operationHandle, token);
+ if (responseBody instanceof NotReadyFetchResultResponse) {
+ continue;
+ }
+ responseBody.getNextResultUri();
+ results.addAll(responseBody.getResults().getData());
+ token =
SqlGatewayRestEndpointUtils.parseToken(responseBody.getNextResultUri());
+ }
+ return results;
+ }
+
+ public FetchResultsResponseBody fetchOperationResults(
+ SessionHandle sessionHandle, OperationHandle
operationHandle, Long token)
+ throws Exception {
+ FetchResultsMessageParameters fetchResultsMessageParameters =
+ new FetchResultsMessageParameters(
+ sessionHandle, operationHandle, token,
RowFormat.PLAIN_TEXT);
Review Comment:
Why it is not `RowFormat.JSON`?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -214,16 +248,230 @@ private void checkJobExists(JobKey jobKey, String
errorMsg)
}
}
+ @VisibleForTesting
+ public Scheduler getQuartzScheduler() {
+ return quartzScheduler;
+ }
+
/** The {@link Job} implementation for embedded quartz scheduler. */
- private class EmbeddedSchedulerJob implements Job {
+ public static class EmbeddedSchedulerJob implements Job {
+
+ public EmbeddedSchedulerJob() {}
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
- String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
- WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
- // TODO: implement the refresh operation for materialized table,
see FLINK-35348
- LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ SessionHandle sessionHandle = null;
+ OperationHandle operationHandle = null;
+ SqlGatewayRestClient gatewayRestClient = null;
+ try {
+ JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
+ WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
+ LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ Configuration configuration = new Configuration();
+ RestClient restClient =
+ RestClient.forUrl(
+ configuration,
+ Executors.newFixedThreadPool(1),
+ new URL(workflowInfo.getRestEndpointUrl()));
+
+ String schedulerTime =
dateToString(context.getScheduledFireTime());
+
+ gatewayRestClient = new
SqlGatewayRestClient(workflowInfo.getRestEndpointUrl());
+ sessionHandle =
+ gatewayRestClient.openSession(
+ String.format(
+ "%s-quartz-refresh-session-%s",
+
workflowInfo.getMaterializedTableIdentifier(),
+ schedulerTime));
+ operationHandle =
+ gatewayRestClient.refreshMaterializedTable(
+ sessionHandle,
+ workflowInfo.getMaterializedTableIdentifier(),
+ true,
+ schedulerTime,
+ workflowInfo.getDynamicOptions(),
+ Collections.emptyMap(),
+ workflowInfo.getExecutionConfig());
+
+ List<RowData> results =
+
gatewayRestClient.fetchOperationAllResults(sessionHandle, operationHandle);
+
+ String jobId = results.get(0).getString(0).toString();
+ LOG.info(
+ "Successfully executed refresh operation for
materialized table: {} with job id: {}.",
+ workflowInfo.getMaterializedTableIdentifier(),
+ jobId);
+
+ context.setResult(
+ "Successfully executed refresh operation for
materialized table: "
+ + workflowInfo.getMaterializedTableIdentifier()
+ + " with job id: "
+ + jobId);
+ // TODO wait for the job to finish
+ } catch (Exception e) {
+ LOG.error("Failed to execute refresh operation for workflow.",
e);
+ throw new JobExecutionException(e.getMessage(), e);
+ } finally {
+ try {
+ if (gatewayRestClient != null) {
+ if (operationHandle != null) {
+ gatewayRestClient.closeOperation(sessionHandle,
operationHandle);
+ }
+ if (sessionHandle != null) {
+ gatewayRestClient.closeSession(sessionHandle);
+ }
+ gatewayRestClient.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to close session.", e);
+ }
+ }
+ }
+
+ /** A simple rest client for gateway rest endpoint. */
+ private static class SqlGatewayRestClient implements AutoCloseable {
+
+ private final int port;
+ private final String address;
+
+ private final RestClient restClient;
+
+ public SqlGatewayRestClient(String endpointUrl) throws Exception {
+ URL url = new URL(endpointUrl);
+ this.address = url.getHost();
+ this.port = url.getPort();
+ this.restClient =
+ RestClient.forUrl(
+ new Configuration(),
Executors.newFixedThreadPool(1), url);
+ }
+
+ public SessionHandle openSession(String sessionName) throws
Exception {
Review Comment:
The access modifiers can all be changed to private
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -214,16 +248,230 @@ private void checkJobExists(JobKey jobKey, String
errorMsg)
}
}
+ @VisibleForTesting
+ public Scheduler getQuartzScheduler() {
+ return quartzScheduler;
+ }
+
/** The {@link Job} implementation for embedded quartz scheduler. */
- private class EmbeddedSchedulerJob implements Job {
+ public static class EmbeddedSchedulerJob implements Job {
Review Comment:
public -> private
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -214,16 +248,230 @@ private void checkJobExists(JobKey jobKey, String
errorMsg)
}
}
+ @VisibleForTesting
+ public Scheduler getQuartzScheduler() {
+ return quartzScheduler;
+ }
+
/** The {@link Job} implementation for embedded quartz scheduler. */
- private class EmbeddedSchedulerJob implements Job {
+ public static class EmbeddedSchedulerJob implements Job {
+
+ public EmbeddedSchedulerJob() {}
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
- String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
- WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
- // TODO: implement the refresh operation for materialized table,
see FLINK-35348
- LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ SessionHandle sessionHandle = null;
+ OperationHandle operationHandle = null;
+ SqlGatewayRestClient gatewayRestClient = null;
+ try {
+ JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
+ WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
+ LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ Configuration configuration = new Configuration();
+ RestClient restClient =
+ RestClient.forUrl(
+ configuration,
+ Executors.newFixedThreadPool(1),
+ new URL(workflowInfo.getRestEndpointUrl()));
+
+ String schedulerTime =
dateToString(context.getScheduledFireTime());
+
+ gatewayRestClient = new
SqlGatewayRestClient(workflowInfo.getRestEndpointUrl());
+ sessionHandle =
+ gatewayRestClient.openSession(
+ String.format(
+ "%s-quartz-refresh-session-%s",
+
workflowInfo.getMaterializedTableIdentifier(),
+ schedulerTime));
+ operationHandle =
+ gatewayRestClient.refreshMaterializedTable(
+ sessionHandle,
+ workflowInfo.getMaterializedTableIdentifier(),
+ true,
+ schedulerTime,
+ workflowInfo.getDynamicOptions(),
+ Collections.emptyMap(),
+ workflowInfo.getExecutionConfig());
+
+ List<RowData> results =
+
gatewayRestClient.fetchOperationAllResults(sessionHandle, operationHandle);
+
+ String jobId = results.get(0).getString(0).toString();
+ LOG.info(
+ "Successfully executed refresh operation for
materialized table: {} with job id: {}.",
+ workflowInfo.getMaterializedTableIdentifier(),
+ jobId);
+
+ context.setResult(
+ "Successfully executed refresh operation for
materialized table: "
+ + workflowInfo.getMaterializedTableIdentifier()
+ + " with job id: "
+ + jobId);
+ // TODO wait for the job to finish
+ } catch (Exception e) {
+ LOG.error("Failed to execute refresh operation for workflow.",
e);
+ throw new JobExecutionException(e.getMessage(), e);
+ } finally {
+ try {
+ if (gatewayRestClient != null) {
+ if (operationHandle != null) {
+ gatewayRestClient.closeOperation(sessionHandle,
operationHandle);
+ }
+ if (sessionHandle != null) {
+ gatewayRestClient.closeSession(sessionHandle);
+ }
+ gatewayRestClient.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to close session.", e);
+ }
+ }
+ }
+
+ /** A simple rest client for gateway rest endpoint. */
+ private static class SqlGatewayRestClient implements AutoCloseable {
+
+ private final int port;
Review Comment:
It would be better:
```
private final String address;
private final int port;
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java:
##########
@@ -99,9 +99,7 @@ public Operation convertSqlNode(
// it from rootConfiguration instead of table config
CatalogMaterializedTable.RefreshMode refreshMode =
MaterializedTableUtils.deriveRefreshMode(
- context.getTableConfig()
- .getRootConfiguration()
- .get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD),
Review Comment:
I don't think we should change this operation scope, the freshness threshold
should be scoped by SQL Gateway, not session granularity. Otherwise, the user
can modify the threshold arbitrarily, and other subsequent operations on the
same materialized table may lead to results that do not meet expectations, such
as modifying the Freshness. For your testing needs, you can achieve this when
creating the Materialized Table by specifying `REFRESH_ MODE = FULL` when
creating the Materialized Table.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -214,16 +248,230 @@ private void checkJobExists(JobKey jobKey, String
errorMsg)
}
}
+ @VisibleForTesting
+ public Scheduler getQuartzScheduler() {
+ return quartzScheduler;
+ }
+
/** The {@link Job} implementation for embedded quartz scheduler. */
- private class EmbeddedSchedulerJob implements Job {
+ public static class EmbeddedSchedulerJob implements Job {
+
+ public EmbeddedSchedulerJob() {}
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
- String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
- WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
- // TODO: implement the refresh operation for materialized table,
see FLINK-35348
- LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ SessionHandle sessionHandle = null;
+ OperationHandle operationHandle = null;
+ SqlGatewayRestClient gatewayRestClient = null;
+ try {
+ JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
+ WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
+ LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ Configuration configuration = new Configuration();
+ RestClient restClient =
+ RestClient.forUrl(
+ configuration,
+ Executors.newFixedThreadPool(1),
+ new URL(workflowInfo.getRestEndpointUrl()));
+
+ String schedulerTime =
dateToString(context.getScheduledFireTime());
+
+ gatewayRestClient = new
SqlGatewayRestClient(workflowInfo.getRestEndpointUrl());
+ sessionHandle =
+ gatewayRestClient.openSession(
+ String.format(
+ "%s-quartz-refresh-session-%s",
+
workflowInfo.getMaterializedTableIdentifier(),
+ schedulerTime));
+ operationHandle =
+ gatewayRestClient.refreshMaterializedTable(
+ sessionHandle,
+ workflowInfo.getMaterializedTableIdentifier(),
+ true,
+ schedulerTime,
+ workflowInfo.getDynamicOptions(),
+ Collections.emptyMap(),
+ workflowInfo.getExecutionConfig());
+
+ List<RowData> results =
+
gatewayRestClient.fetchOperationAllResults(sessionHandle, operationHandle);
+
+ String jobId = results.get(0).getString(0).toString();
+ LOG.info(
+ "Successfully executed refresh operation for
materialized table: {} with job id: {}.",
+ workflowInfo.getMaterializedTableIdentifier(),
+ jobId);
+
+ context.setResult(
+ "Successfully executed refresh operation for
materialized table: "
+ + workflowInfo.getMaterializedTableIdentifier()
+ + " with job id: "
+ + jobId);
+ // TODO wait for the job to finish
+ } catch (Exception e) {
+ LOG.error("Failed to execute refresh operation for workflow.",
e);
+ throw new JobExecutionException(e.getMessage(), e);
+ } finally {
+ try {
+ if (gatewayRestClient != null) {
+ if (operationHandle != null) {
+ gatewayRestClient.closeOperation(sessionHandle,
operationHandle);
+ }
+ if (sessionHandle != null) {
+ gatewayRestClient.closeSession(sessionHandle);
+ }
+ gatewayRestClient.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to close session.", e);
+ }
+ }
+ }
+
+ /** A simple rest client for gateway rest endpoint. */
+ private static class SqlGatewayRestClient implements AutoCloseable {
+
+ private final int port;
+ private final String address;
+
+ private final RestClient restClient;
+
+ public SqlGatewayRestClient(String endpointUrl) throws Exception {
+ URL url = new URL(endpointUrl);
+ this.address = url.getHost();
+ this.port = url.getPort();
+ this.restClient =
+ RestClient.forUrl(
+ new Configuration(),
Executors.newFixedThreadPool(1), url);
Review Comment:
To reduce the thread count impact on SQL Gateway, we should use
`Executors.directExecutor()` here.
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -158,36 +164,107 @@ void testCreateMaterializedTableInContinuousMode()
throws Exception {
}
@Test
- void testCreateMaterializedTableInFullMode() {
+ void testCreateMaterializedTableInFullMode() throws Exception {
+ String dataId =
TestValuesTableFactory.registerData(Collections.emptyList());
+ String sourceDdl =
+ String.format(
+ "CREATE TABLE IF NOT EXISTS my_source (\n"
+ + " order_id BIGINT,\n"
+ + " user_id BIGINT,\n"
+ + " shop_id BIGINT,\n"
+ + " order_created_at STRING\n"
+ + ")\n"
+ + "WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true',\n"
+ + " 'data-id' = '%s'\n"
+ + ")",
+ dataId);
+
+ OperationHandle sourceHandle =
+ service.executeStatement(sessionHandle, sourceDdl, -1, new
Configuration());
+ awaitOperationTermination(service, sessionHandle, sourceHandle);
+
String materializedTableDDL =
"CREATE MATERIALIZED TABLE users_shops"
+ " PARTITIONED BY (ds)\n"
+ " WITH(\n"
+ + " 'partition.fields.ds.date-formatter' =
'yyyy-MM-dd',\n"
+ " 'format' = 'debezium-json'\n"
+ " )\n"
- + " FRESHNESS = INTERVAL '1' DAY\n"
+ + " FRESHNESS = INTERVAL '1' MINUTE\n"
+ " AS SELECT \n"
+ " user_id,\n"
+ " shop_id,\n"
+ " ds,\n"
- + " SUM (payment_amount_cents) AS
payed_buy_fee_sum,\n"
- + " SUM (1) AS pv\n"
+ + " COUNT(order_id) AS order_cnt\n"
+ " FROM (\n"
- + " SELECT user_id, shop_id,
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM
datagenSource"
+ + " SELECT user_id, shop_id, order_created_at AS
ds, order_id FROM my_source"
+ " ) AS tmp\n"
+ " GROUP BY (user_id, shop_id, ds)";
+ Configuration configuration = new Configuration();
+ configuration.set(
+
MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD,
Review Comment:
As above comment, we don't need to set this option manually.
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -158,36 +164,107 @@ void testCreateMaterializedTableInContinuousMode()
throws Exception {
}
@Test
- void testCreateMaterializedTableInFullMode() {
+ void testCreateMaterializedTableInFullMode() throws Exception {
+ String dataId =
TestValuesTableFactory.registerData(Collections.emptyList());
+ String sourceDdl =
+ String.format(
+ "CREATE TABLE IF NOT EXISTS my_source (\n"
+ + " order_id BIGINT,\n"
+ + " user_id BIGINT,\n"
+ + " shop_id BIGINT,\n"
+ + " order_created_at STRING\n"
+ + ")\n"
+ + "WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true',\n"
+ + " 'data-id' = '%s'\n"
+ + ")",
+ dataId);
+
+ OperationHandle sourceHandle =
+ service.executeStatement(sessionHandle, sourceDdl, -1, new
Configuration());
+ awaitOperationTermination(service, sessionHandle, sourceHandle);
+
String materializedTableDDL =
"CREATE MATERIALIZED TABLE users_shops"
+ " PARTITIONED BY (ds)\n"
+ " WITH(\n"
+ + " 'partition.fields.ds.date-formatter' =
'yyyy-MM-dd',\n"
+ " 'format' = 'debezium-json'\n"
+ " )\n"
- + " FRESHNESS = INTERVAL '1' DAY\n"
+ + " FRESHNESS = INTERVAL '1' MINUTE\n"
+ " AS SELECT \n"
+ " user_id,\n"
+ " shop_id,\n"
+ " ds,\n"
- + " SUM (payment_amount_cents) AS
payed_buy_fee_sum,\n"
- + " SUM (1) AS pv\n"
+ + " COUNT(order_id) AS order_cnt\n"
+ " FROM (\n"
- + " SELECT user_id, shop_id,
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM
datagenSource"
+ + " SELECT user_id, shop_id, order_created_at AS
ds, order_id FROM my_source"
+ " ) AS tmp\n"
+ " GROUP BY (user_id, shop_id, ds)";
+ Configuration configuration = new Configuration();
+ configuration.set(
+
MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD,
+ Duration.ofSeconds(30));
OperationHandle materializedTableHandle =
- service.executeStatement(
- sessionHandle, materializedTableDDL, -1, new
Configuration());
+ service.executeStatement(sessionHandle, materializedTableDDL,
-1, configuration);
+ awaitOperationTermination(service, sessionHandle,
materializedTableHandle);
- assertThatThrownBy(
- () ->
- awaitOperationTermination(
- service, sessionHandle,
materializedTableHandle))
- .rootCause()
- .isInstanceOf(SqlExecutionException.class)
- .hasMessage(
- "Only support create materialized table in continuous
refresh mode currently.");
+ // verify materialized table is created
+ ResolvedCatalogMaterializedTable actualMaterializedTable =
+ (ResolvedCatalogMaterializedTable)
+ service.getTable(
+ sessionHandle,
+ ObjectIdentifier.of(
+ fileSystemCatalogName,
+ TEST_DEFAULT_DATABASE,
+ "users_shops"));
+
+ // verify refresh mode
+ assertThat(actualMaterializedTable.getRefreshMode())
+ .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL);
+
+ // verify refresh handler
+ byte[] serializedHandler =
actualMaterializedTable.getSerializedRefreshHandler();
+ EmbeddedRefreshHandler embeddedRefreshHandler =
+ EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+ serializedHandler, getClass().getClassLoader());
+ assertThat(embeddedRefreshHandler.getWorkflowName())
+ .isEqualTo(
+ "quartz_job_"
+ + ObjectIdentifier.of(
+ fileSystemCatalogName,
+ TEST_DEFAULT_DATABASE,
+ "users_shops")
+ .asSerializableString());
+
+ EmbeddedQuartzScheduler embeddedWorkflowScheduler =
+ SQL_GATEWAY_REST_ENDPOINT_EXTENSION
+ .getSqlGatewayRestEndpoint()
+ .getQuartzScheduler();
+ JobKey jobKey =
+ new JobKey(
+ embeddedRefreshHandler.getWorkflowName(),
+ embeddedRefreshHandler.getWorkflowGroup());
+
+ // wait scheduler to start
+ CommonTestUtils.waitUtil(
+ () -> {
+ try {
+ return
embeddedWorkflowScheduler.getQuartzScheduler().isStarted();
Review Comment:
This wait goal is what? The scheduler is started after SqlGateway starts.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -214,16 +248,230 @@ private void checkJobExists(JobKey jobKey, String
errorMsg)
}
}
+ @VisibleForTesting
+ public Scheduler getQuartzScheduler() {
+ return quartzScheduler;
+ }
+
/** The {@link Job} implementation for embedded quartz scheduler. */
- private class EmbeddedSchedulerJob implements Job {
+ public static class EmbeddedSchedulerJob implements Job {
+
+ public EmbeddedSchedulerJob() {}
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
- String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
- WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
- // TODO: implement the refresh operation for materialized table,
see FLINK-35348
- LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ SessionHandle sessionHandle = null;
+ OperationHandle operationHandle = null;
+ SqlGatewayRestClient gatewayRestClient = null;
+ try {
+ JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
+ WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
+ LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ Configuration configuration = new Configuration();
+ RestClient restClient =
+ RestClient.forUrl(
+ configuration,
+ Executors.newFixedThreadPool(1),
+ new URL(workflowInfo.getRestEndpointUrl()));
+
+ String schedulerTime =
dateToString(context.getScheduledFireTime());
+
+ gatewayRestClient = new
SqlGatewayRestClient(workflowInfo.getRestEndpointUrl());
+ sessionHandle =
+ gatewayRestClient.openSession(
+ String.format(
+ "%s-quartz-refresh-session-%s",
+
workflowInfo.getMaterializedTableIdentifier(),
+ schedulerTime));
+ operationHandle =
+ gatewayRestClient.refreshMaterializedTable(
+ sessionHandle,
+ workflowInfo.getMaterializedTableIdentifier(),
+ true,
+ schedulerTime,
+ workflowInfo.getDynamicOptions(),
+ Collections.emptyMap(),
+ workflowInfo.getExecutionConfig());
+
+ List<RowData> results =
+
gatewayRestClient.fetchOperationAllResults(sessionHandle, operationHandle);
+
+ String jobId = results.get(0).getString(0).toString();
+ LOG.info(
+ "Successfully executed refresh operation for
materialized table: {} with job id: {}.",
+ workflowInfo.getMaterializedTableIdentifier(),
+ jobId);
+
+ context.setResult(
+ "Successfully executed refresh operation for
materialized table: "
+ + workflowInfo.getMaterializedTableIdentifier()
+ + " with job id: "
+ + jobId);
+ // TODO wait for the job to finish
+ } catch (Exception e) {
+ LOG.error("Failed to execute refresh operation for workflow.",
e);
+ throw new JobExecutionException(e.getMessage(), e);
+ } finally {
+ try {
+ if (gatewayRestClient != null) {
+ if (operationHandle != null) {
+ gatewayRestClient.closeOperation(sessionHandle,
operationHandle);
+ }
+ if (sessionHandle != null) {
+ gatewayRestClient.closeSession(sessionHandle);
+ }
+ gatewayRestClient.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to close session.", e);
+ }
+ }
+ }
+
+ /** A simple rest client for gateway rest endpoint. */
+ private static class SqlGatewayRestClient implements AutoCloseable {
+
+ private final int port;
+ private final String address;
+
+ private final RestClient restClient;
+
+ public SqlGatewayRestClient(String endpointUrl) throws Exception {
+ URL url = new URL(endpointUrl);
+ this.address = url.getHost();
+ this.port = url.getPort();
+ this.restClient =
+ RestClient.forUrl(
+ new Configuration(),
Executors.newFixedThreadPool(1), url);
+ }
+
+ public SessionHandle openSession(String sessionName) throws
Exception {
+ Map<String, String> properties = Collections.emptyMap();
+
+ OpenSessionRequestBody requestBody =
+ new OpenSessionRequestBody(sessionName, properties);
+ OpenSessionHeaders headers = OpenSessionHeaders.getInstance();
+
+ OpenSessionResponseBody responseBody =
+ restClient
+ .sendRequest(
+ address,
+ port,
+ headers,
+ EmptyMessageParameters.getInstance(),
+ requestBody)
+ .get();
+
+ return new
SessionHandle(UUID.fromString(responseBody.getSessionHandle()));
+ }
+
+ public void closeSession(SessionHandle sessionHandle) throws
Exception {
+ // Close session
+ CloseSessionHeaders closeSessionHeaders =
CloseSessionHeaders.getInstance();
+ SessionMessageParameters sessionMessageParameters =
+ new SessionMessageParameters(sessionHandle);
+ restClient
+ .sendRequest(
+ address,
+ port,
+ closeSessionHeaders,
+ sessionMessageParameters,
+ EmptyRequestBody.getInstance())
+ .get();
+ }
+
+ public void closeOperation(SessionHandle sessionHandle,
OperationHandle operationHandle)
+ throws Exception {
+ // Close operation
+ CloseOperationHeaders closeOperationHeaders =
CloseOperationHeaders.getInstance();
+ OperationMessageParameters operationMessageParameters =
+ new OperationMessageParameters(sessionHandle,
operationHandle);
+ restClient
+ .sendRequest(
+ address,
+ port,
+ closeOperationHeaders,
+ operationMessageParameters,
+ EmptyRequestBody.getInstance())
+ .get();
+ }
+
+ public OperationHandle refreshMaterializedTable(
+ SessionHandle sessionHandle,
+ String materializedTableIdentifier,
+ boolean ignoreFailures,
+ String schedulerTime,
+ Map<String, String> dynamicOptions,
+ Map<String, String> staticPartitions,
+ Map<String, String> executionConfig)
+ throws Exception {
+
+ RefreshMaterializedTableRequestBody requestBody =
+ new RefreshMaterializedTableRequestBody(
+ true,
+ schedulerTime,
+ dynamicOptions,
+ staticPartitions,
+ executionConfig);
+ RefreshMaterializedTableHeaders headers =
+ RefreshMaterializedTableHeaders.getInstance();
+ RefreshMaterializedTableParameters parameters =
+ new RefreshMaterializedTableParameters(
+ sessionHandle, materializedTableIdentifier);
+
+ RefreshMaterializedTableResponseBody responseBody =
+ restClient
+ .sendRequest(address, port, headers,
parameters, requestBody)
+ .get();
+
+ return new
OperationHandle(UUID.fromString(responseBody.getOperationHandle()));
+ }
+
+ public List<RowData> fetchOperationAllResults(
+ SessionHandle sessionHandle, OperationHandle
operationHandle) throws Exception {
+ Long token = 0L;
+ List<RowData> results = new ArrayList<>();
+ while (token != null) {
+ FetchResultsResponseBody responseBody =
+ fetchOperationResults(sessionHandle,
operationHandle, token);
+ if (responseBody instanceof NotReadyFetchResultResponse) {
+ continue;
+ }
+ responseBody.getNextResultUri();
+ results.addAll(responseBody.getResults().getData());
+ token =
SqlGatewayRestEndpointUtils.parseToken(responseBody.getNextResultUri());
+ }
+ return results;
+ }
+
+ public FetchResultsResponseBody fetchOperationResults(
+ SessionHandle sessionHandle, OperationHandle
operationHandle, Long token)
+ throws Exception {
+ FetchResultsMessageParameters fetchResultsMessageParameters =
+ new FetchResultsMessageParameters(
+ sessionHandle, operationHandle, token,
RowFormat.PLAIN_TEXT);
Review Comment:
Why it is not `RowFormat.JSON`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]