yunqingmoswu commented on code in PR #6302:
URL: https://github.com/apache/inlong/pull/6302#discussion_r1006350518
##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java:
##########
@@ -92,14 +96,16 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context
context) {
.setExecutionOptions(executionOptions)
.setDatabasePattern(databasePattern)
.setTablePattern(tablePattern)
- .setDynamicSchemaFormat(sinkMultipleFormat);
+ .setDynamicSchemaFormat(sinkMultipleFormat)
Review Comment:
The metrics is also required for the normal sink.
##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -124,6 +149,11 @@ public void open(int taskNumber, int numTasks) throws
IOException {
executionOptions.getStreamLoadProp());
jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat)
DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat);
+
+ if (metricOption != null) {
+ metricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ }
+ this.numPendingRequests = new AtomicLong(0);
Review Comment:
Maybe it is not necessary here?
##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -259,10 +290,14 @@ public synchronized void flush() throws IOException {
for (Entry<String, List> kvs : batchMap.entrySet()) {
load(kvs.getKey(),
OBJECT_MAPPER.writeValueAsString(kvs.getValue()));
}
+ numPendingRequests.set(0);
}
private void load(String tableIdentifier, String result) throws
IOException {
String[] tableWithDb = tableIdentifier.split("\\.");
+ if (metricData != null) {
+ metricData.invokeWithEstimate(result);
+ }
Review Comment:
The dirty metrics statitic is also required?
##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -298,6 +333,37 @@ private String getBackend() throws IOException {
}
}
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
+ Gson gson = new Gson();
+ LOG.info("snapshotState begin, context is:{}",
gson.toJson(functionSnapshotContext));
+ while (numPendingRequests.get() != 0) {
+ flush();
+ }
+ if (metricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState,
metricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+ LOG.info("snapshotState end, context is:{}",
gson.toJson(functionSnapshotContext));
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+ Gson gson = new Gson();
+ LOG.info("initializeState begin, context is:{}", gson.toJson(context));
Review Comment:
It is recommend use the json handle mode that has exists not the 'Gson' and
also it is not necessary for log here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]