hackergin commented on code in PR #24866:
URL: https://github.com/apache/flink/pull/24866#discussion_r1622183393
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -214,16 +248,230 @@ private void checkJobExists(JobKey jobKey, String
errorMsg)
}
}
+ @VisibleForTesting
+ public Scheduler getQuartzScheduler() {
+ return quartzScheduler;
+ }
+
/** The {@link Job} implementation for embedded quartz scheduler. */
- private class EmbeddedSchedulerJob implements Job {
+ public static class EmbeddedSchedulerJob implements Job {
+
+ public EmbeddedSchedulerJob() {}
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
- String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
- WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
- // TODO: implement the refresh operation for materialized table,
see FLINK-35348
- LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ SessionHandle sessionHandle = null;
+ OperationHandle operationHandle = null;
+ SqlGatewayRestClient gatewayRestClient = null;
+ try {
+ JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ String workflowJsonStr = dataMap.getString(WORKFLOW_INFO);
+ WorkflowInfo workflowInfo = fromJson(workflowJsonStr,
WorkflowInfo.class);
+ LOG.info("Execute refresh operation for workflow: {}.",
workflowInfo);
+
+ Configuration configuration = new Configuration();
+ RestClient restClient =
+ RestClient.forUrl(
+ configuration,
+ Executors.newFixedThreadPool(1),
+ new URL(workflowInfo.getRestEndpointUrl()));
+
+ String schedulerTime =
dateToString(context.getScheduledFireTime());
+
+ gatewayRestClient = new
SqlGatewayRestClient(workflowInfo.getRestEndpointUrl());
+ sessionHandle =
+ gatewayRestClient.openSession(
+ String.format(
+ "%s-quartz-refresh-session-%s",
+
workflowInfo.getMaterializedTableIdentifier(),
+ schedulerTime));
+ operationHandle =
+ gatewayRestClient.refreshMaterializedTable(
+ sessionHandle,
+ workflowInfo.getMaterializedTableIdentifier(),
+ true,
+ schedulerTime,
+ workflowInfo.getDynamicOptions(),
+ Collections.emptyMap(),
+ workflowInfo.getExecutionConfig());
+
+ List<RowData> results =
+
gatewayRestClient.fetchOperationAllResults(sessionHandle, operationHandle);
+
+ String jobId = results.get(0).getString(0).toString();
+ LOG.info(
+ "Successfully executed refresh operation for
materialized table: {} with job id: {}.",
+ workflowInfo.getMaterializedTableIdentifier(),
+ jobId);
+
+ context.setResult(
+ "Successfully executed refresh operation for
materialized table: "
+ + workflowInfo.getMaterializedTableIdentifier()
+ + " with job id: "
+ + jobId);
+ // TODO wait for the job to finish
+ } catch (Exception e) {
+ LOG.error("Failed to execute refresh operation for workflow.",
e);
+ throw new JobExecutionException(e.getMessage(), e);
+ } finally {
+ try {
+ if (gatewayRestClient != null) {
+ if (operationHandle != null) {
+ gatewayRestClient.closeOperation(sessionHandle,
operationHandle);
+ }
+ if (sessionHandle != null) {
+ gatewayRestClient.closeSession(sessionHandle);
+ }
+ gatewayRestClient.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to close session.", e);
+ }
+ }
+ }
+
+ /** A simple rest client for gateway rest endpoint. */
+ private static class SqlGatewayRestClient implements AutoCloseable {
+
+ private final int port;
+ private final String address;
+
+ private final RestClient restClient;
+
+ public SqlGatewayRestClient(String endpointUrl) throws Exception {
+ URL url = new URL(endpointUrl);
+ this.address = url.getHost();
+ this.port = url.getPort();
+ this.restClient =
+ RestClient.forUrl(
+ new Configuration(),
Executors.newFixedThreadPool(1), url);
+ }
+
+ public SessionHandle openSession(String sessionName) throws
Exception {
+ Map<String, String> properties = Collections.emptyMap();
+
+ OpenSessionRequestBody requestBody =
+ new OpenSessionRequestBody(sessionName, properties);
+ OpenSessionHeaders headers = OpenSessionHeaders.getInstance();
+
+ OpenSessionResponseBody responseBody =
+ restClient
+ .sendRequest(
+ address,
+ port,
+ headers,
+ EmptyMessageParameters.getInstance(),
+ requestBody)
+ .get();
+
+ return new
SessionHandle(UUID.fromString(responseBody.getSessionHandle()));
+ }
+
+ public void closeSession(SessionHandle sessionHandle) throws
Exception {
+ // Close session
+ CloseSessionHeaders closeSessionHeaders =
CloseSessionHeaders.getInstance();
+ SessionMessageParameters sessionMessageParameters =
+ new SessionMessageParameters(sessionHandle);
+ restClient
+ .sendRequest(
+ address,
+ port,
+ closeSessionHeaders,
+ sessionMessageParameters,
+ EmptyRequestBody.getInstance())
+ .get();
+ }
+
+ public void closeOperation(SessionHandle sessionHandle,
OperationHandle operationHandle)
+ throws Exception {
+ // Close operation
+ CloseOperationHeaders closeOperationHeaders =
CloseOperationHeaders.getInstance();
+ OperationMessageParameters operationMessageParameters =
+ new OperationMessageParameters(sessionHandle,
operationHandle);
+ restClient
+ .sendRequest(
+ address,
+ port,
+ closeOperationHeaders,
+ operationMessageParameters,
+ EmptyRequestBody.getInstance())
+ .get();
+ }
+
+ public OperationHandle refreshMaterializedTable(
+ SessionHandle sessionHandle,
+ String materializedTableIdentifier,
+ boolean ignoreFailures,
+ String schedulerTime,
+ Map<String, String> dynamicOptions,
+ Map<String, String> staticPartitions,
+ Map<String, String> executionConfig)
+ throws Exception {
+
+ RefreshMaterializedTableRequestBody requestBody =
+ new RefreshMaterializedTableRequestBody(
+ true,
+ schedulerTime,
+ dynamicOptions,
+ staticPartitions,
+ executionConfig);
+ RefreshMaterializedTableHeaders headers =
+ RefreshMaterializedTableHeaders.getInstance();
+ RefreshMaterializedTableParameters parameters =
+ new RefreshMaterializedTableParameters(
+ sessionHandle, materializedTableIdentifier);
+
+ RefreshMaterializedTableResponseBody responseBody =
+ restClient
+ .sendRequest(address, port, headers,
parameters, requestBody)
+ .get();
+
+ return new
OperationHandle(UUID.fromString(responseBody.getOperationHandle()));
+ }
+
+ public List<RowData> fetchOperationAllResults(
+ SessionHandle sessionHandle, OperationHandle
operationHandle) throws Exception {
+ Long token = 0L;
+ List<RowData> results = new ArrayList<>();
+ while (token != null) {
+ FetchResultsResponseBody responseBody =
+ fetchOperationResults(sessionHandle,
operationHandle, token);
+ if (responseBody instanceof NotReadyFetchResultResponse) {
+ continue;
+ }
+ responseBody.getNextResultUri();
+ results.addAll(responseBody.getResults().getData());
+ token =
SqlGatewayRestEndpointUtils.parseToken(responseBody.getNextResultUri());
+ }
+ return results;
+ }
+
+ public FetchResultsResponseBody fetchOperationResults(
+ SessionHandle sessionHandle, OperationHandle
operationHandle, Long token)
+ throws Exception {
+ FetchResultsMessageParameters fetchResultsMessageParameters =
+ new FetchResultsMessageParameters(
+ sessionHandle, operationHandle, token,
RowFormat.PLAIN_TEXT);
Review Comment:
changed to RowFormat.JSON. JsonRowFormat will retain the original field and
its type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]