This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d71e79013 [INLONG-7809][Sort] ES multiple sink support dirty data
runtime strategies (#7812)
d71e79013 is described below
commit d71e7901339773f2be241a929243e69a317c6033
Author: Yizhou Yang <[email protected]>
AuthorDate: Tue Apr 11 15:59:56 2023 +0800
[INLONG-7809][Sort] ES multiple sink support dirty data runtime strategies
(#7812)
Co-authored-by: Yizhou Yang <[email protected]>
---
.../sort/base/metric/sub/SinkTableMetricData.java | 48 ++++++++++++
.../sort/elasticsearch6/ElasticsearchSink.java | 14 +++-
.../table/Elasticsearch6DynamicSink.java | 1 +
.../sort/elasticsearch7/ElasticsearchSink.java | 14 +++-
.../table/Elasticsearch7DynamicSink.java | 1 +
.../sort/elasticsearch/ElasticsearchSinkBase.java | 13 +++-
.../MultipleElasticsearchSinkFunctionBase.java | 91 +++++++++++++---------
7 files changed, 138 insertions(+), 44 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
index 40a869d87..92f9da48f 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -223,6 +223,30 @@ public class SinkTableMetricData extends SinkMetricData
implements SinkSubMetric
subSinkMetricData.invoke(rowCount, rowSize);
}
+ /**
+ * output metrics
+ *
+ * @param index the index name of record
+ * @param rowCount the row count of records
+ * @param rowSize the row size of records
+ */
+ public void outputMetrics(String index, long rowCount, long rowSize) {
+ if (StringUtils.isBlank(index)) {
+ invoke(rowCount, rowSize);
+ return;
+ }
+ SinkMetricData subSinkMetricData;
+ if (subSinkMetricMap.containsKey(index)) {
+ subSinkMetricData = subSinkMetricMap.get(index);
+ } else {
+ subSinkMetricData = buildSubSinkMetricData(new String[]{index},
this);
+ subSinkMetricMap.put(index, subSinkMetricData);
+ }
+ // sink metric and sub sink metric output metrics
+ this.invoke(rowCount, rowSize);
+ subSinkMetricData.invoke(rowCount, rowSize);
+ }
+
/**
* output dirty metrics with estimate
*
@@ -336,6 +360,30 @@ public class SinkTableMetricData extends SinkMetricData
implements SinkSubMetric
subSinkMetricData.invokeDirty(rowCount, rowSize);
}
+ /**
+ * output dirty metrics
+ *
+ * @param index the table name of record
+ * @param rowCount the row count of records
+ * @param rowSize the row size of records
+ */
+ public void outputDirtyMetrics(String index, long rowCount, long rowSize) {
+ if (StringUtils.isBlank(index)) {
+ invokeDirty(rowCount, rowSize);
+ return;
+ }
+ SinkMetricData subSinkMetricData;
+ if (subSinkMetricMap.containsKey(index)) {
+ subSinkMetricData = subSinkMetricMap.get(index);
+ } else {
+ subSinkMetricData = buildSubSinkMetricData(new String[]{index},
this);
+ subSinkMetricMap.put(index, subSinkMetricData);
+ }
+ // sink metric and sub sink metric output metrics
+ this.invokeDirty(rowCount, rowSize);
+ subSinkMetricData.invokeDirty(rowCount, rowSize);
+ }
+
/**
* output dirty metrics with estimate
*
diff --git
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
index 15f60bfb8..e4f8ff37b 100644
---
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
+++
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
@@ -83,14 +83,16 @@ public class ElasticsearchSink<T>
RestClientFactory restClientFactory,
String inlongMetric,
DirtySinkHelper<Object> dirtySinkHelper,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ boolean multipleSink) {
super(new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),
bulkRequestsConfig,
elasticsearchSinkFunction,
failureHandler,
inlongMetric,
dirtySinkHelper,
- auditHostAndPorts);
+ auditHostAndPorts,
+ multipleSink);
}
@Override
@@ -131,6 +133,7 @@ public class ElasticsearchSink<T>
private String inlongMetric = null;
private DirtySinkHelper<Object> dirtySinkHelper;
private String auditHostAndPorts;
+ private boolean multipleSink;
/**
* Creates a new {@code ElasticsearchSink} that connects to the
cluster using a {@link
@@ -286,6 +289,10 @@ public class ElasticsearchSink<T>
this.restClientFactory =
Preconditions.checkNotNull(restClientFactory);
}
+ public void setMultipleSink(boolean multipleSink) {
+ this.multipleSink = multipleSink;
+ }
+
/**
* Creates the Elasticsearch sink.
* Use {@link DirtySinkFailureHandler} when need sink dirty data
@@ -304,7 +311,8 @@ public class ElasticsearchSink<T>
restClientFactory,
inlongMetric,
dirtySinkHelper,
- auditHostAndPorts);
+ auditHostAndPorts,
+ multipleSink);
}
@Override
diff --git
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
index 3372896f5..15bed1d97 100644
---
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
+++
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
@@ -174,6 +174,7 @@ final class Elasticsearch6DynamicSink implements
DynamicTableSink {
builder.setRestClientFactory(
new
DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
}
+ builder.setMultipleSink(multipleSink);
final ElasticsearchSink<RowData> sink = builder.build();
if (config.isDisableFlushOnCheckpoint()) {
sink.disableFlushOnCheckpoint();
diff --git
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
index ead02098e..5719dbbc4 100644
---
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
+++
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
@@ -83,14 +83,16 @@ public class ElasticsearchSink<T>
RestClientFactory restClientFactory,
String inlongMetric,
DirtySinkHelper<Object> dirtySinkHelper,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ boolean multipleSink) {
super(new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory),
bulkRequestsConfig,
elasticsearchSinkFunction,
failureHandler,
inlongMetric,
dirtySinkHelper,
- auditHostAndPorts);
+ auditHostAndPorts,
+ multipleSink);
}
@Override
@@ -131,6 +133,7 @@ public class ElasticsearchSink<T>
private String inlongMetric = null;
private DirtySinkHelper<Object> dirtySinkHelper;
private String auditHostAndPorts;
+ private boolean multipleSink;
/**
* Creates a new {@code ElasticsearchSink} that connects to the
cluster using a {@link
@@ -286,6 +289,10 @@ public class ElasticsearchSink<T>
this.restClientFactory =
Preconditions.checkNotNull(restClientFactory);
}
+ public void setMultipleSink(boolean multipleSink) {
+ this.multipleSink = multipleSink;
+ }
+
/**
* Creates the Elasticsearch sink.
* Use {@link DirtySinkFailureHandler} when need sink dirty data
@@ -304,7 +311,8 @@ public class ElasticsearchSink<T>
restClientFactory,
inlongMetric,
dirtySinkHelper,
- auditHostAndPorts);
+ auditHostAndPorts,
+ multipleSink);
}
@Override
diff --git
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
index 05e34ee4b..74f74e1a8 100644
---
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
+++
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
@@ -207,6 +207,7 @@ final class Elasticsearch7DynamicSink implements
DynamicTableSink {
builder.setRestClientFactory(
new
DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
}
+ builder.setMultipleSink(multipleSink);
final ElasticsearchSink<RowData> sink = builder.build();
if (config.isDisableFlushOnCheckpoint()) {
sink.disableFlushOnCheckpoint();
diff --git
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index b0efe1888..65faddf6a 100644
---
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -36,6 +36,7 @@ import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import java.io.Serializable;
@@ -160,9 +161,10 @@ public abstract class ElasticsearchSinkBase<T, Request,
Builder, Listener, BulkI
* Bulk processor to buffer and send requests to Elasticsearch, created
using the client.
*/
private transient BulkProcessor bulkProcessor;
- private SinkMetricData sinkMetricData;
+ private SinkTableMetricData sinkMetricData;
private transient ListState<MetricState> metricStateListState;
private transient MetricState metricState;
+ private final boolean multipleSink;
public ElasticsearchSinkBase(
ElasticsearchApiCallBridge<Request, Builder, Listener,
BulkItemResponse, BulkProcessor, C> callBridge,
@@ -171,13 +173,15 @@ public abstract class ElasticsearchSinkBase<T, Request,
Builder, Listener, BulkI
ActionRequestFailureHandler<Request> failureHandler,
String inlongMetric,
DirtySinkHelper<Object> dirtySinkHelper,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ boolean multipleSink) {
this.inlongMetric = inlongMetric;
this.dirtySinkHelper = dirtySinkHelper;
this.callBridge = checkNotNull(callBridge);
this.elasticsearchSinkFunction =
checkNotNull(elasticsearchSinkFunction);
this.failureHandler = checkNotNull(failureHandler);
this.auditHostAndPorts = auditHostAndPorts;
+ this.multipleSink = multipleSink;
// we eagerly check if the user-provided sink function and failure
handler is serializable;
// otherwise, if they aren't serializable, users will merely get a
non-informative error
// message
@@ -266,7 +270,10 @@ public abstract class ElasticsearchSinkBase<T, Request,
Builder, Listener, BulkI
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
- sinkMetricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ sinkMetricData = new SinkTableMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ if (multipleSink) {
+ sinkMetricData.registerSubMetricsGroup(metricState);
+ }
}
dirtySinkHelper.open(parameters);
Listener listener = createListener();
diff --git
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/MultipleElasticsearchSinkFunctionBase.java
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/MultipleElasticsearchSinkFunctionBase.java
index 4e54e553e..2ff512704 100644
---
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/MultipleElasticsearchSinkFunctionBase.java
+++
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/MultipleElasticsearchSinkFunctionBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
+import java.util.HashSet;
import java.util.UUID;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
@@ -36,6 +37,7 @@ import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
import org.apache.inlong.sort.elasticsearch.RequestIndexer;
@@ -74,9 +76,11 @@ public abstract class
MultipleElasticsearchSinkFunctionBase<Request, ContentType
// open and store an index generator for each new index.
private Map<String, IndexGenerator> indexGeneratorMap;
// table level metrics
- private SinkMetricData sinkMetricData;
+ private SinkTableMetricData sinkMetricData;
private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
private transient SerializationSchema<RowData> serializationSchema;
+ // a hashset containing indices which are skipped due to exceptions.
+ private final HashSet<String> errorSet = new HashSet<>();
public MultipleElasticsearchSinkFunctionBase(
@Nullable String docType, // this is deprecated in es 7+
@@ -106,12 +110,12 @@ public abstract class
MultipleElasticsearchSinkFunctionBase<Request, ContentType
@Override
public void open(RuntimeContext ctx, SinkMetricData sinkMetricData) {
indexGeneratorMap = new HashMap<>();
- this.sinkMetricData = sinkMetricData;
+ this.sinkMetricData = (SinkTableMetricData) sinkMetricData;
}
- private void sendMetrics(byte[] document) {
+ private void sendMetrics(byte[] document, String index) {
if (sinkMetricData != null) {
- sinkMetricData.invoke(1, document.length);
+ sinkMetricData.outputMetrics(index, 1, document.length);
}
}
@@ -125,7 +129,7 @@ public abstract class
MultipleElasticsearchSinkFunctionBase<Request, ContentType
@Override
public void process(RowData element, RuntimeContext ctx,
RequestIndexer<Request> indexer) {
- JsonNode rootNode = null;
+ JsonNode rootNode;
// parse rootnode
try {
jsonDynamicSchemaFormat =
@@ -155,10 +159,15 @@ public abstract class
MultipleElasticsearchSinkFunctionBase<Request, ContentType
document = serializationSchema.serialize(data);
} catch (Exception e) {
LOGGER.error(String.format("Serialize error, raw data: %s", data),
e);
- dirtySinkHelper.invoke(data, DirtyType.SERIALIZE_ERROR, e);
- if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(1,
data.toString().getBytes(StandardCharsets.UTF_8).length);
- }
+ handleDirty(data, DirtyType.SERIALIZE_ERROR, e, null);
+ return;
+ }
+ final String index;
+ try {
+ index = parseIndex(data, rootNode);
+ } catch (Exception e) {
+ LOGGER.error(String.format("Generate index error, raw data: %s",
data), e);
+ handleDirty(data, DirtyType.INDEX_GENERATE_ERROR, e, null);
return;
}
final String key;
@@ -168,26 +177,44 @@ public abstract class
MultipleElasticsearchSinkFunctionBase<Request, ContentType
key =
UUID.nameUUIDFromBytes(physicalData.toString().getBytes(StandardCharsets.UTF_8)).toString();
} catch (Exception e) {
LOGGER.error(String.format("Generate index id error, raw data:
%s", data), e);
- dirtySinkHelper.invoke(data, DirtyType.INDEX_ID_GENERATE_ERROR, e);
- if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(1, document.length);
- }
+ handleDirty(data, DirtyType.INDEX_ID_GENERATE_ERROR, e, index);
return;
}
- final String index;
- try {
- index = parseIndex(data, rootNode);
- } catch (Exception e) {
- LOGGER.error(String.format("Generate index error, raw data: %s",
data), e);
- dirtySinkHelper.invoke(data, DirtyType.INDEX_GENERATE_ERROR, e);
- if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(1, document.length);
- }
+ // if the index is contained in errorset, then skip this record.
+ if (errorSet.contains(index)) {
return;
}
addDocument(data, key, index, document, indexer);
}
+ private void handleDirty(RowData rowData, DirtyType dirtyType, Exception
e, String index) {
+ // skip the index in which the error has occurred
+ if (SchemaUpdateExceptionPolicy.STOP_PARTIAL ==
schemaUpdateExceptionPolicy) {
+ if (index != null) {
+ errorSet.add(index);
+ } else {
+ return;
+ }
+ }
+
+ // keep retry the entire task until it succeeds
+ if (SchemaUpdateExceptionPolicy.THROW_WITH_STOP ==
schemaUpdateExceptionPolicy) {
+ throw new RuntimeException(String.format("Writing records %s
failed, restarting task",
+ rowData), e);
+ }
+
+ // dirty data & archive
+ if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE ==
schemaUpdateExceptionPolicy) {
+ dirtySinkHelper.invoke(rowData, dirtyType, e);
+ if (sinkMetricData != null && index != null) {
+ sinkMetricData.outputDirtyMetrics(index, 1,
+
rowData.toString().getBytes(StandardCharsets.UTF_8).length);
+ } else {
+ sinkMetricData.invokeDirty(1,
rowData.toString().getBytes(StandardCharsets.UTF_8).length);
+ }
+ }
+ }
+
private String parseIndex(RowData rowData, JsonNode rootNode)
throws Exception {
String index;
@@ -219,13 +246,13 @@ public abstract class
MultipleElasticsearchSinkFunctionBase<Request, ContentType
request = requestFactory.createUpdateRequest(index,
docType, key, contentType, document);
if (addRouting(request, element, document)) {
indexer.add(request);
- sendMetrics(document);
+ sendMetrics(document, index);
}
} else {
request = requestFactory.createIndexRequest(index,
docType, key, contentType, document);
if (addRouting(request, element, document)) {
indexer.add(request);
- sendMetrics(document);
+ sendMetrics(document, index);
}
}
break;
@@ -233,19 +260,16 @@ public abstract class
MultipleElasticsearchSinkFunctionBase<Request, ContentType
request = requestFactory.createDeleteRequest(index, docType,
key);
if (addRouting(request, element, document)) {
indexer.add(request);
- sendMetrics(document);
+ sendMetrics(document, index);
}
break;
case UPDATE_BEFORE:
- sendMetrics(document);
+ sendMetrics(document, index);
break;
default:
LOGGER.error(String.format("The type of element should be
'RowData' only, raw data: %s", element));
- dirtySinkHelper.invoke(element,
DirtyType.UNSUPPORTED_DATA_TYPE,
- new RuntimeException("The type of element should be
'RowData' only."));
- if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(1, document.length);
- }
+ handleDirty(element, DirtyType.UNSUPPORTED_DATA_TYPE,
+ new RuntimeException("The type of element should be
'RowData' only."), index);
}
}
@@ -256,10 +280,7 @@ public abstract class
MultipleElasticsearchSinkFunctionBase<Request, ContentType
handleRouting(request, routing);
} catch (Exception e) {
LOGGER.error(String.format("Routing error, raw data: %s",
row), e);
- dirtySinkHelper.invoke(row, DirtyType.INDEX_ROUTING_ERROR, e);
- if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(1, document.length);
- }
+ handleDirty(row, DirtyType.INDEX_ROUTING_ERROR, e, null);
return false;
}
}