This is an automated email from the ASF dual-hosted git repository.
yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 99947c98d [INLONG-6869][Sort] Supports dirty data side-output for
elasticsearch sink (#6870)
99947c98d is described below
commit 99947c98d6437a694ccd3ffd1177f88add12dd31
Author: yunqingmoswu <[email protected]>
AuthorDate: Wed Dec 14 15:23:38 2022 +0800
[INLONG-6869][Sort] Supports dirty data side-output for elasticsearch sink
(#6870)
---
.../inlong/sort/base/dirty/DirtySinkHelper.java | 108 ++++++++++++++++++
.../apache/inlong/sort/base/dirty/DirtyType.java | 16 +++
.../sort/elasticsearch6/ElasticsearchSink.java | 20 +++-
.../table/Elasticsearch6DynamicSink.java | 16 ++-
.../table/Elasticsearch6DynamicSinkFactory.java | 14 ++-
.../sort/elasticsearch7/ElasticsearchSink.java | 19 +++-
.../table/Elasticsearch7DynamicSink.java | 16 ++-
.../table/Elasticsearch7DynamicSinkFactory.java | 14 ++-
.../sort/elasticsearch/ElasticsearchSinkBase.java | 122 +++++++++++---------
.../table/RowElasticsearchSinkFunction.java | 124 ++++++++++++++-------
.../sort/parser/ElasticsearchSqlParseTest.java | 10 +-
11 files changed, 369 insertions(+), 110 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
new file mode 100644
index 000000000..a962b974e
--- /dev/null
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
+/**
+ * Dirty sink helper, it helps dirty data sink for {@link DirtySink}
+ * @param <T>
+ */
+public class DirtySinkHelper<T> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DirtySinkHelper.class);
+
+ private DirtyOptions dirtyOptions;
+ private final @Nullable DirtySink<T> dirtySink;
+
+ public DirtySinkHelper(DirtyOptions dirtyOptions, @Nullable DirtySink<T>
dirtySink) {
+ this.dirtyOptions = Preconditions.checkNotNull(dirtyOptions,
"dirtyOptions is null");
+ this.dirtySink = dirtySink;
+ }
+
+ /**
+ * Open for dirty sink
+ *
+ * @param configuration The configuration that is used for dirty sink
+ */
+ public void open(Configuration configuration) {
+ if (dirtySink != null) {
+ try {
+ dirtySink.open(configuration);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Dirty data sink
+ * @param dirtyData The dirty data
+ * @param dirtyType The dirty type {@link DirtyType}
+ * @param e The cause of dirty data
+ */
+ public void invoke(T dirtyData, DirtyType dirtyType, Throwable e) {
+ if (!dirtyOptions.ignoreDirty()) {
+ RuntimeException ex;
+ if (e instanceof RuntimeException) {
+ ex = (RuntimeException) e;
+ } else {
+ ex = new RuntimeException(e);
+ }
+ throw ex;
+ }
+ if (dirtySink != null) {
+ DirtyData.Builder<T> builder = DirtyData.builder();
+ try {
+ builder.setData(dirtyData)
+ .setDirtyType(dirtyType)
+ .setLabels(dirtyOptions.getLabels())
+ .setLogTag(dirtyOptions.getLogTag())
+ .setDirtyMessage(e.getMessage())
+ .setIdentifier(dirtyOptions.getIdentifier());
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOGGER.warn("Dirty sink failed", ex);
+ }
+ }
+ }
+
+ public void setDirtyOptions(DirtyOptions dirtyOptions) {
+ this.dirtyOptions = dirtyOptions;
+ }
+
+ public DirtyOptions getDirtyOptions() {
+ return dirtyOptions;
+ }
+
+ @Nullable
+ public DirtySink<T> getDirtySink() {
+ return dirtySink;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
index 0637725c3..89789872a 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
@@ -88,6 +88,22 @@ public enum DirtyType {
* Extract RowData error
*/
EXTRACT_ROWDATA_ERROR("ExtractRowDataError"),
+ /**
+ * Index generate error
+ */
+ INDEX_GENERATE_ERROR("IndexGenerateError"),
+ /**
+ * Index id generate error
+ */
+ INDEX_ID_GENERATE_ERROR("IndexIdGenerateError"),
+ /**
+ * Index routing error
+ */
+ INDEX_ROUTING_ERROR("IndexRoutingError"),
+ /**
+ * Document parse error
+ */
+ DOCUMENT_PARSE_ERROR("DocumentParseError"),
;
private final String format;
diff --git
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
index f33fb7fa4..5ada044cb 100644
---
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
+++
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
@@ -23,6 +23,7 @@ import
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandl
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.apache.flink.util.Preconditions;
import org.apache.http.HttpHost;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
import org.elasticsearch.action.ActionRequest;
@@ -70,14 +71,15 @@ public class ElasticsearchSink<T> extends
ElasticsearchSinkBase<T, RestHighLevel
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
ActionRequestFailureHandler failureHandler,
RestClientFactory restClientFactory,
- String inlongMetric) {
-
+ String inlongMetric,
+ DirtySinkHelper<Object> dirtySinkHelper) {
super(
new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),
bulkRequestsConfig,
elasticsearchSinkFunction,
failureHandler,
- inlongMetric);
+ inlongMetric,
+ dirtySinkHelper);
}
/**
@@ -96,6 +98,7 @@ public class ElasticsearchSink<T> extends
ElasticsearchSinkBase<T, RestHighLevel
private RestClientFactory restClientFactory = restClientBuilder -> {
};
private String inlongMetric = null;
+ private DirtySinkHelper<Object> dirtySinkHelper;
/**
* Creates a new {@code ElasticsearchSink} that connects to the
cluster using a {@link
@@ -120,6 +123,14 @@ public class ElasticsearchSink<T> extends
ElasticsearchSinkBase<T, RestHighLevel
this.inlongMetric = inlongMetric;
}
+ /**
+ * Set dirty sink helper
+ * @param dirtySinkHelper The dirty sink helper
+ */
+ public void setDirtySinkHelper(DirtySinkHelper<Object>
dirtySinkHelper) {
+ this.dirtySinkHelper = dirtySinkHelper;
+ }
+
/**
* Sets the maximum number of actions to buffer for each bulk request.
You can pass -1 to
* disable it.
@@ -244,7 +255,8 @@ public class ElasticsearchSink<T> extends
ElasticsearchSinkBase<T, RestHighLevel
elasticsearchSinkFunction,
failureHandler,
restClientFactory,
- inlongMetric);
+ inlongMetric,
+ dirtySinkHelper);
}
@Override
diff --git
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
index 0b0a3cf58..61dfdb3cb 100644
---
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
+++
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
@@ -35,6 +35,7 @@ import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
@@ -67,6 +68,7 @@ final class Elasticsearch6DynamicSink implements
DynamicTableSink {
private final String inlongMetric;
private final String auditHostAndPorts;
private final ElasticSearchBuilderProvider builderProvider;
+ private final DirtySinkHelper<Object> dirtySinkHelper;
// --------------------------------------------------------------
// Hack to make configuration testing possible.
@@ -83,8 +85,10 @@ final class Elasticsearch6DynamicSink implements
DynamicTableSink {
Elasticsearch6Configuration config,
TableSchema schema,
String inlongMetric,
- String auditHostAndPorts) {
- this(format, config, schema, (ElasticsearchSink.Builder::new),
inlongMetric, auditHostAndPorts);
+ String auditHostAndPorts,
+ DirtySinkHelper<Object> dirtySinkHelper) {
+ this(format, config, schema, (ElasticsearchSink.Builder::new),
+ inlongMetric, auditHostAndPorts, dirtySinkHelper);
}
Elasticsearch6DynamicSink(
@@ -93,13 +97,15 @@ final class Elasticsearch6DynamicSink implements
DynamicTableSink {
TableSchema schema,
ElasticSearchBuilderProvider builderProvider,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ DirtySinkHelper<Object> dirtySinkHelper) {
this.format = format;
this.schema = schema;
this.config = config;
this.builderProvider = builderProvider;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
+ this.dirtySinkHelper = dirtySinkHelper;
}
@Override
@@ -134,7 +140,8 @@ final class Elasticsearch6DynamicSink implements
DynamicTableSink {
RoutingExtractor.createRoutingExtractor(
schema,
config.getRoutingField().orElse(null)),
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ dirtySinkHelper);
final ElasticsearchSink.Builder<RowData> builder =
builderProvider.createBuilder(config.getHosts(),
upsertFunction);
@@ -145,6 +152,7 @@ final class Elasticsearch6DynamicSink implements
DynamicTableSink {
builder.setBulkFlushInterval(config.getBulkFlushInterval());
builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
builder.setInLongMetric(inlongMetric);
+ builder.setDirtySinkHelper(dirtySinkHelper);
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
diff --git
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
index 01833cd3c..501d3ccd7 100644
---
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
+++
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
@@ -32,6 +32,10 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.StringUtils;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
import org.apache.inlong.sort.elasticsearch.table.ElasticsearchValidationUtils;
import java.util.Set;
@@ -39,6 +43,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
@@ -99,7 +104,7 @@ public class Elasticsearch6DynamicSinkFactory implements
DynamicTableSinkFactory
final EncodingFormat<SerializationSchema<RowData>> format =
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
- helper.validate();
+ helper.validateExcept(DIRTY_PREFIX);
Configuration configuration = new Configuration();
context.getCatalogTable().getOptions().forEach(configuration::setString);
Elasticsearch6Configuration config =
@@ -110,9 +115,12 @@ public class Elasticsearch6DynamicSinkFactory implements
DynamicTableSinkFactory
String inlongMetric =
helper.getOptions().getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts =
helper.getOptions().getOptional(INLONG_AUDIT).orElse(null);
-
+ final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(helper.getOptions());
+ final DirtySink<Object> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
+ final DirtySinkHelper<Object> dirtySinkHelper = new
DirtySinkHelper<>(dirtyOptions, dirtySink);
return new Elasticsearch6DynamicSink(
- format, config,
TableSchemaUtils.getPhysicalSchema(tableSchema), inlongMetric,
auditHostAndPorts);
+ format, config,
TableSchemaUtils.getPhysicalSchema(tableSchema),
+ inlongMetric, auditHostAndPorts, dirtySinkHelper);
}
private void validate(Elasticsearch6Configuration config, Configuration
originalConfiguration) {
diff --git
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
index 10b7cdfb8..9dd1dd90f 100644
---
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
+++
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.elasticsearch7;
import org.apache.flink.annotation.PublicEvolving;
import
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
import
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
@@ -71,14 +72,16 @@ public class ElasticsearchSink<T> extends
ElasticsearchSinkBase<T, RestHighLevel
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
ActionRequestFailureHandler failureHandler,
RestClientFactory restClientFactory,
- String inlongMetric) {
+ String inlongMetric,
+ DirtySinkHelper<Object> dirtySinkHelper) {
super(
new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory),
bulkRequestsConfig,
elasticsearchSinkFunction,
failureHandler,
- inlongMetric);
+ inlongMetric,
+ dirtySinkHelper);
}
/**
@@ -97,6 +100,7 @@ public class ElasticsearchSink<T> extends
ElasticsearchSinkBase<T, RestHighLevel
private RestClientFactory restClientFactory = restClientBuilder -> {
};
private String inlongMetric = null;
+ private DirtySinkHelper<Object> dirtySinkHelper;
/**
* Creates a new {@code ElasticsearchSink} that connects to the
cluster using a {@link
@@ -121,6 +125,14 @@ public class ElasticsearchSink<T> extends
ElasticsearchSinkBase<T, RestHighLevel
this.inlongMetric = inlongMetric;
}
+ /**
+ * Set dirty sink helper
+ * @param dirtySinkHelper The dirty sink helper
+ */
+ public void setDirtySinkHelper(DirtySinkHelper<Object>
dirtySinkHelper) {
+ this.dirtySinkHelper = dirtySinkHelper;
+ }
+
/**
* Sets the maximum number of actions to buffer for each bulk request.
You can pass -1 to
* disable it.
@@ -245,7 +257,8 @@ public class ElasticsearchSink<T> extends
ElasticsearchSinkBase<T, RestHighLevel
elasticsearchSinkFunction,
failureHandler,
restClientFactory,
- inlongMetric);
+ inlongMetric,
+ dirtySinkHelper);
}
@Override
diff --git
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
index 29e2f1795..009283330 100644
---
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
+++
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
@@ -35,6 +35,7 @@ import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
@@ -68,6 +69,7 @@ final class Elasticsearch7DynamicSink implements
DynamicTableSink {
private final String inlongMetric;
private final String auditHostAndPorts;
private final ElasticSearchBuilderProvider builderProvider;
+ private final DirtySinkHelper<Object> dirtySinkHelper;
// --------------------------------------------------------------
// Hack to make configuration testing possible.
@@ -84,8 +86,10 @@ final class Elasticsearch7DynamicSink implements
DynamicTableSink {
Elasticsearch7Configuration config,
TableSchema schema,
String inlongMetric,
- String auditHostAndPorts) {
- this(format, config, schema, (ElasticsearchSink.Builder::new),
inlongMetric, auditHostAndPorts);
+ String auditHostAndPorts,
+ DirtySinkHelper<Object> dirtySinkHelper) {
+ this(format, config, schema, (ElasticsearchSink.Builder::new),
+ inlongMetric, auditHostAndPorts, dirtySinkHelper);
}
Elasticsearch7DynamicSink(
@@ -94,13 +98,15 @@ final class Elasticsearch7DynamicSink implements
DynamicTableSink {
TableSchema schema,
ElasticSearchBuilderProvider builderProvider,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ DirtySinkHelper<Object> dirtySinkHelper) {
this.format = format;
this.schema = schema;
this.config = config;
this.builderProvider = builderProvider;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
+ this.dirtySinkHelper = dirtySinkHelper;
}
@Override
@@ -135,7 +141,8 @@ final class Elasticsearch7DynamicSink implements
DynamicTableSink {
RoutingExtractor.createRoutingExtractor(
schema,
config.getRoutingField().orElse(null)),
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ dirtySinkHelper);
final ElasticsearchSink.Builder<RowData> builder =
builderProvider.createBuilder(config.getHosts(),
upsertFunction);
@@ -146,6 +153,7 @@ final class Elasticsearch7DynamicSink implements
DynamicTableSink {
builder.setBulkFlushInterval(config.getBulkFlushInterval());
builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
builder.setInLongMetric(inlongMetric);
+ builder.setDirtySinkHelper(dirtySinkHelper);
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
diff --git
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
index 1abbf1705..a1bfaa90e 100644
---
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
+++
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
@@ -32,6 +32,10 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.StringUtils;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
import org.apache.inlong.sort.elasticsearch.table.ElasticsearchValidationUtils;
import java.util.Set;
@@ -39,6 +43,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
@@ -99,7 +104,7 @@ public class Elasticsearch7DynamicSinkFactory implements
DynamicTableSinkFactory
final EncodingFormat<SerializationSchema<RowData>> format =
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
- helper.validate();
+ helper.validateExcept(DIRTY_PREFIX);
Configuration configuration = new Configuration();
context.getCatalogTable().getOptions().forEach(configuration::setString);
Elasticsearch7Configuration config =
@@ -110,9 +115,12 @@ public class Elasticsearch7DynamicSinkFactory implements
DynamicTableSinkFactory
String inlongMetric =
helper.getOptions().getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts =
helper.getOptions().getOptional(INLONG_AUDIT).orElse(null);
-
+ final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(helper.getOptions());
+ final DirtySink<Object> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
+ final DirtySinkHelper<Object> dirtySinkHelper = new
DirtySinkHelper<>(dirtyOptions, dirtySink);
return new Elasticsearch7DynamicSink(
- format, config,
TableSchemaUtils.getPhysicalSchema(tableSchema), inlongMetric,
auditHostAndPorts);
+ format, config,
TableSchemaUtils.getPhysicalSchema(tableSchema),
+ inlongMetric, auditHostAndPorts, dirtySinkHelper);
}
private void validate(Elasticsearch7Configuration config, Configuration
originalConfiguration) {
diff --git
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 50fc2a8f0..c21685de0 100644
---
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -28,23 +28,32 @@ import
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import
org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -76,6 +85,8 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
"bulk.flush.max.actions";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
// ------------------------------------------------------------------------
// Internal bulk processor configuration
// ------------------------------------------------------------------------
@@ -125,6 +136,7 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
*/
private final AtomicReference<Throwable> failureThrowable = new
AtomicReference<>();
private final String inlongMetric;
+ private final DirtySinkHelper<Object> dirtySinkHelper;
/**
* If true, the producer will wait until all outstanding action requests
have been sent to
* Elasticsearch.
@@ -170,8 +182,10 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
Map<String, String> userConfig,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
ActionRequestFailureHandler failureHandler,
- String inlongMetric) {
+ String inlongMetric,
+ DirtySinkHelper<Object> dirtySinkHelper) {
this.inlongMetric = inlongMetric;
+ this.dirtySinkHelper = dirtySinkHelper;
this.callBridge = checkNotNull(callBridge);
this.elasticsearchSinkFunction =
checkNotNull(elasticsearchSinkFunction);
this.failureHandler = checkNotNull(failureHandler);
@@ -275,9 +289,9 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
if (metricOption != null) {
sinkMetricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
}
-
+ dirtySinkHelper.open(parameters);
callBridge.verifyClientConnection(client);
- bulkProcessor = buildBulkProcessor(new
BulkProcessorListener(sinkMetricData));
+ bulkProcessor = buildBulkProcessor(new
BulkProcessorListener(sinkMetricData, dirtySinkHelper));
requestIndexer =
callBridge.createBulkProcessorIndexer(
bulkProcessor, flushOnCheckpoint, numPendingRequests);
@@ -462,10 +476,13 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
private class BulkProcessorListener implements BulkProcessor.Listener {
- private SinkMetricData sinkMetricData;
+ private final SinkMetricData sinkMetricData;
+ private final DirtySinkHelper<Object> dirtySinkHelper;
- public BulkProcessorListener(SinkMetricData sinkMetricData) {
+ public BulkProcessorListener(SinkMetricData sinkMetricData,
+ DirtySinkHelper<Object> dirtySinkHelper) {
this.sinkMetricData = sinkMetricData;
+ this.dirtySinkHelper = dirtySinkHelper;
}
@Override
@@ -477,42 +494,16 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
if (response.hasFailures()) {
BulkItemResponse itemResponse;
Throwable failure;
- RestStatus restStatus;
- DocWriteRequest actionRequest;
-
+ int restStatus;
try {
for (int i = 0; i < response.getItems().length; i++) {
itemResponse = response.getItems()[i];
failure =
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
- restStatus = itemResponse.getFailure().getStatus();
- actionRequest = request.requests().get(i);
- if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(1, 0);
- }
- if (restStatus == null) {
- if (actionRequest instanceof ActionRequest) {
- failureHandler.onFailure(
- (ActionRequest) actionRequest,
- failure,
- -1,
- failureRequestIndexer);
- } else {
- throw new UnsupportedOperationException(
- "The sink currently only supports
ActionRequests");
- }
- } else {
- if (actionRequest instanceof ActionRequest) {
- failureHandler.onFailure(
- (ActionRequest) actionRequest,
- failure,
- restStatus.getStatus(),
- failureRequestIndexer);
- } else {
- throw new UnsupportedOperationException(
- "The sink currently only supports
ActionRequests");
- }
- }
+ restStatus = itemResponse.getFailure().getStatus()
!= null
+ ?
itemResponse.getFailure().getStatus().getStatus()
+ : -1;
+ handleFailure(request.requests().get(i),
restStatus, failure);
}
}
} catch (Throwable t) {
@@ -521,7 +512,6 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
failureThrowable.compareAndSet(null, t);
}
}
-
if (flushOnCheckpoint) {
numPendingRequests.getAndAdd(-request.numberOfActions());
}
@@ -530,27 +520,59 @@ public abstract class ElasticsearchSinkBase<T, C extends
AutoCloseable> extends
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable
failure) {
try {
- for (DocWriteRequest writeRequest : request.requests()) {
- if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(1, 0);
- }
- if (writeRequest instanceof ActionRequest) {
- failureHandler.onFailure(
- (ActionRequest) writeRequest, failure, -1,
failureRequestIndexer);
- } else {
- throw new UnsupportedOperationException(
- "The sink currently only supports
ActionRequests");
- }
+ for (DocWriteRequest<?> writeRequest : request.requests()) {
+ handleFailure(writeRequest, -1, failure);
}
} catch (Throwable t) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, t);
}
-
if (flushOnCheckpoint) {
numPendingRequests.getAndAdd(-request.numberOfActions());
}
}
+
+ private void handleFailure(DocWriteRequest<?> writeRequest, int
restStatus, Throwable failure)
+ throws Throwable {
+ // Only supports dirty data sink when the failureHandler is
IgnoringFailureHandler
+ if (failureHandler instanceof IgnoringFailureHandler) {
+ if (!(writeRequest instanceof ActionRequest)) {
+ LOGGER.error("The sink currently only supports
ActionRequests");
+ dirtySinkHelper.invoke(writeRequest.id(),
DirtyType.UNSUPPORTED_DATA_TYPE,
+ new UnsupportedOperationException("The sink
currently only supports ActionRequests"));
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(1,
writeRequest.id().getBytes(StandardCharsets.UTF_8).length);
+ }
+ } else if (ExceptionUtils.findThrowable(failure,
ElasticsearchParseException.class).isPresent()) {
+ String dirtyData;
+ if (writeRequest instanceof UpdateRequest) {
+ dirtyData = ((UpdateRequest)
writeRequest).doc().source().utf8ToString();
+ } else if (writeRequest instanceof IndexRequest) {
+ dirtyData = ((IndexRequest)
writeRequest).source().utf8ToString();
+ } else {
+ dirtyData = writeRequest.id();
+ }
+ LOGGER.error(String.format("Elasticsearch parse exception,
raw data: %s", dirtyData), failure);
+ dirtySinkHelper.invoke(dirtyData,
DirtyType.DOCUMENT_PARSE_ERROR, failure);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(1,
dirtyData.getBytes(StandardCharsets.UTF_8).length);
+ }
+ } else {
+ throw failure;
+ }
+ } else {
+ if (writeRequest instanceof ActionRequest) {
+ failureHandler.onFailure(
+ (ActionRequest) writeRequest,
+ failure,
+ restStatus,
+ failureRequestIndexer);
+ } else {
+ throw new UnsupportedOperationException(
+ "The sink currently only supports ActionRequests");
+ }
+ }
+ }
}
}
diff --git
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 4ff80bcca..fe605e3c1 100644
---
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -26,13 +26,14 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.inlong.sort.base.metric.SinkMetricData;
@@ -42,9 +43,12 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.function.Function;
@@ -57,6 +61,8 @@ public class RowElasticsearchSinkFunction implements
ElasticsearchSinkFunction<R
private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RowElasticsearchSinkFunction.class);
+
private final IndexGenerator indexGenerator;
private final String docType;
private final SerializationSchema<RowData> serializationSchema;
@@ -73,6 +79,7 @@ public class RowElasticsearchSinkFunction implements
ElasticsearchSinkFunction<R
private transient RuntimeContext runtimeContext;
private SinkMetricData sinkMetricData;
+ private final DirtySinkHelper<Object> dirtySinkHelper;
public RowElasticsearchSinkFunction(
IndexGenerator indexGenerator,
@@ -83,7 +90,8 @@ public class RowElasticsearchSinkFunction implements
ElasticsearchSinkFunction<R
Function<RowData, String> createKey,
@Nullable Function<RowData, String> createRouting,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ DirtySinkHelper<Object> dirtySinkHelper) {
this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
this.docType = docType;
this.serializationSchema =
Preconditions.checkNotNull(serializationSchema);
@@ -93,6 +101,7 @@ public class RowElasticsearchSinkFunction implements
ElasticsearchSinkFunction<R
this.createRouting = createRouting;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
+ this.dirtySinkHelper = dirtySinkHelper;
}
@Override
@@ -146,55 +155,94 @@ public class RowElasticsearchSinkFunction implements
ElasticsearchSinkFunction<R
@Override
public void process(RowData element, RuntimeContext ctx, RequestIndexer
indexer) {
+ final byte[] document;
+ try {
+ document = serializationSchema.serialize(element);
+ } catch (Exception e) {
+ LOGGER.error(String.format("Serialize error, raw data: %s",
element), e);
+ dirtySinkHelper.invoke(element, DirtyType.SERIALIZE_ERROR, e);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(1,
element.toString().getBytes(StandardCharsets.UTF_8).length);
+ }
+ return;
+ }
+ final String key;
+ try {
+ key = createKey.apply(element);
+ } catch (Exception e) {
+ LOGGER.error(String.format("Generate index id error, raw data:
%s", element), e);
+ dirtySinkHelper.invoke(element, DirtyType.INDEX_ID_GENERATE_ERROR,
e);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(1, document.length);
+ }
+ return;
+ }
+ final String index;
+ try {
+ index = indexGenerator.generate(element);
+ } catch (Exception e) {
+ LOGGER.error(String.format("Generate index error, raw data: %s",
element), e);
+ dirtySinkHelper.invoke(element, DirtyType.INDEX_GENERATE_ERROR, e);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(1, document.length);
+ }
+ return;
+ }
+ addDocument(element, key, index, document, indexer);
+ }
+
+ private void addDocument(RowData element, String key, String index, byte[]
document, RequestIndexer indexer) {
+ DocWriteRequest<?> request;
switch (element.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
- processUpsert(element, indexer);
+ if (key != null) {
+ request = requestFactory.createUpdateRequest(index,
docType, key, contentType, document);
+ if (addRouting(request, element, document)) {
+ indexer.add((UpdateRequest) request);
+ sendMetrics(document);
+ }
+ } else {
+ request = requestFactory.createIndexRequest(index,
docType, key, contentType, document);
+ if (addRouting(request, element, document)) {
+ indexer.add((IndexRequest) request);
+ sendMetrics(document);
+ }
+ }
break;
case UPDATE_BEFORE:
case DELETE:
- processDelete(element, indexer);
+ request = requestFactory.createDeleteRequest(index, docType,
key);
+ if (addRouting(request, element, document)) {
+ indexer.add((DeleteRequest) request);
+ sendMetrics(document);
+ }
break;
default:
- throw new TableException("Unsupported message kind: " +
element.getRowKind());
+ LOGGER.error(String.format("The type of element should be
'RowData' only, raw data: %s", element));
+ dirtySinkHelper.invoke(element,
DirtyType.UNSUPPORTED_DATA_TYPE,
+ new RuntimeException("The type of element should be
'RowData' only."));
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(1, document.length);
+ }
}
}
- private void processUpsert(RowData row, RequestIndexer indexer) {
- final byte[] document = serializationSchema.serialize(row);
- final String key = createKey.apply(row);
- sendMetrics(document);
- if (key != null) {
- final UpdateRequest updateRequest =
- requestFactory.createUpdateRequest(
- indexGenerator.generate(row), docType, key,
contentType, document);
- addRouting(updateRequest, row);
- indexer.add(updateRequest);
- } else {
- final IndexRequest indexRequest =
- requestFactory.createIndexRequest(
- indexGenerator.generate(row), docType, key,
contentType, document);
- addRouting(indexRequest, row);
- indexer.add(indexRequest);
- }
- }
-
- private void processDelete(RowData row, RequestIndexer indexer) {
- // the serialization is just for metrics
- final byte[] document = serializationSchema.serialize(row);
- sendMetrics(document);
- final String key = createKey.apply(row);
- final DeleteRequest deleteRequest =
-
requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key);
- addRouting(deleteRequest, row);
- indexer.add(deleteRequest);
- }
-
- private void addRouting(DocWriteRequest<?> request, RowData row) {
+ private boolean addRouting(DocWriteRequest<?> request, RowData row, byte[]
document) {
if (null != createRouting) {
- String routing = createRouting.apply(row);
- request.routing(routing);
+ try {
+ String routing = createRouting.apply(row);
+ request.routing(routing);
+ } catch (Exception e) {
+ LOGGER.error(String.format("Routing error, raw data: %s",
row), e);
+ dirtySinkHelper.invoke(row, DirtyType.INDEX_ROUTING_ERROR, e);
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeDirty(1, document.length);
+ }
+ return false;
+ }
}
+ return true;
}
@Override
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
index 9e7d4e74d..bf4ef0505 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
@@ -21,6 +21,7 @@ package org.apache.inlong.sort.parser;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -74,8 +75,15 @@ public abstract class ElasticsearchSqlParseTest extends
AbstractTestBase {
new FieldInfo("name", new
StringFormatInfo())));
CsvFormat csvFormat = new CsvFormat();
csvFormat.setDisableQuoteCharacter(true);
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "log");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&table=test");
return new ElasticsearchLoadNode("2", "kafka_output", fields,
relations, null, null,
- 2, null,
+ 2, properties,
"test", "http://localhost:9200",
"elastic", "my_password", null, "age", version);
}