This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cc9a7bda0 [INLONG-6792][Sort] Supports dirty data side-output for
hbase sink (#6793)
cc9a7bda0 is described below
commit cc9a7bda02fc0fb18691430188b79fdee569d6ab
Author: yunqingmoswu <[email protected]>
AuthorDate: Fri Dec 9 15:54:01 2022 +0800
[INLONG-6792][Sort] Supports dirty data side-output for hbase sink (#6793)
---
.../sort/hbase/HBase2DynamicTableFactory.java | 12 ++-
.../sort/hbase/sink/HBaseDynamicTableSink.java | 17 ++++-
.../inlong/sort/hbase/sink/HBaseSinkFunction.java | 88 +++++++++++++++-------
.../sort/parser/HbaseLoadFlinkSqlParseTest.java | 10 ++-
4 files changed, 93 insertions(+), 34 deletions(-)
diff --git
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
index 54566ebb0..12b8f3e04 100644
---
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
@@ -31,6 +31,9 @@ import
org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
import org.apache.hadoop.conf.Configuration;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
import org.apache.inlong.sort.hbase.sink.HBaseDynamicTableSink;
import java.util.HashSet;
@@ -55,6 +58,7 @@ import static
org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseWrit
import static
org.apache.flink.connector.hbase.options.HBaseOptions.validatePrimaryKey;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
import static
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -91,7 +95,7 @@ public class HBase2DynamicTableFactory
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
TableFactoryHelper helper = createTableFactoryHelper(this, context);
- helper.validateExcept(PROPERTIES_PREFIX);
+ helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX);
final ReadableConfig tableOptions = helper.getOptions();
@@ -107,9 +111,11 @@ public class HBase2DynamicTableFactory
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromTableSchema(tableSchema);
String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
String inlongAudit = tableOptions.get(INLONG_AUDIT);
-
+ final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(tableOptions);
+ final DirtySink<Object> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
return new HBaseDynamicTableSink(
- tableName, hbaseSchema, hbaseConf, hBaseWriteOptions,
nullStringLiteral, inlongMetric, inlongAudit);
+ tableName, hbaseSchema, hbaseConf, hBaseWriteOptions,
nullStringLiteral,
+ inlongMetric, inlongAudit, dirtyOptions, dirtySink);
}
@Override
diff --git
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
index 0f68fd811..2a4542706 100644
---
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
+++
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
@@ -28,6 +28,10 @@ import
org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+
+import javax.annotation.Nullable;
/** HBase table sink implementation. */
@Internal
@@ -40,6 +44,8 @@ public class HBaseDynamicTableSink implements
DynamicTableSink {
private final String nullStringLiteral;
private final String inlongMetric;
private final String inlongAudit;
+ private final DirtyOptions dirtyOptions;
+ private @Nullable final DirtySink<Object> dirtySink;
public HBaseDynamicTableSink(
String tableName,
@@ -48,7 +54,9 @@ public class HBaseDynamicTableSink implements
DynamicTableSink {
HBaseWriteOptions writeOptions,
String nullStringLiteral,
String inlongMetric,
- String inlongAudit) {
+ String inlongAudit,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
this.tableName = tableName;
this.hbaseTableSchema = hbaseTableSchema;
this.hbaseConf = hbaseConf;
@@ -56,6 +64,8 @@ public class HBaseDynamicTableSink implements
DynamicTableSink {
this.nullStringLiteral = nullStringLiteral;
this.inlongMetric = inlongMetric;
this.inlongAudit = inlongAudit;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
@Override
@@ -68,7 +78,7 @@ public class HBaseDynamicTableSink implements
DynamicTableSink {
writeOptions.getBufferFlushMaxSizeInBytes(),
writeOptions.getBufferFlushMaxRows(),
writeOptions.getBufferFlushIntervalMillis(),
- inlongMetric, inlongAudit);
+ inlongMetric, inlongAudit, dirtyOptions, dirtySink);
return SinkFunctionProvider.of(sinkFunction,
writeOptions.getParallelism());
}
@@ -80,7 +90,8 @@ public class HBaseDynamicTableSink implements
DynamicTableSink {
@Override
public DynamicTableSink copy() {
return new HBaseDynamicTableSink(
- tableName, hbaseTableSchema, hbaseConf, writeOptions,
nullStringLiteral, inlongMetric, inlongAudit);
+ tableName, hbaseTableSchema, hbaseConf, writeOptions,
+ nullStringLiteral, inlongMetric, inlongAudit, dirtyOptions,
dirtySink);
}
@Override
diff --git
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index e69243282..3724047b3 100644
---
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -34,6 +34,7 @@ import
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -42,7 +43,12 @@ import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
@@ -51,6 +57,7 @@ import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
@@ -81,7 +88,7 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
BufferedMutator.ExceptionListener {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(HBaseSinkFunction.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HBaseSinkFunction.class);
private final String hTableName;
private final byte[] serializedConfig;
@@ -114,6 +121,8 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
private transient volatile boolean closed = false;
private Long dataSize = 0L;
private Long rowSize = 0L;
+ private final DirtyOptions dirtyOptions;
+ private @Nullable final DirtySink<Object> dirtySink;
public HBaseSinkFunction(
String hTableName,
@@ -123,7 +132,9 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
long bufferFlushMaxMutations,
long bufferFlushIntervalMillis,
String inlongMetric,
- String inlongAudit) {
+ String inlongAudit,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
this.hTableName = hTableName;
// Configuration is not serializable
this.serializedConfig =
HBaseConfigurationUtil.serializeConfiguration(conf);
@@ -133,11 +144,13 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
this.inlongMetric = inlongMetric;
this.inlongAudit = inlongAudit;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
@Override
public void open(Configuration parameters) throws Exception {
- LOG.info("start open ...");
+ LOGGER.info("Start hbase sink function open ...");
org.apache.hadoop.conf.Configuration config =
prepareRuntimeConfiguration();
try {
this.runtimeContext = getRuntimeContext();
@@ -153,6 +166,9 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
if (metricOption != null) {
sinkMetricData = new SinkMetricData(metricOption,
runtimeContext.getMetricGroup());
}
+ if (dirtySink != null) {
+ dirtySink.open(parameters);
+ }
this.mutationConverter.open();
this.numPendingRequests = new AtomicLong(0);
@@ -184,13 +200,13 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
TimeUnit.MILLISECONDS);
}
} catch (TableNotFoundException tnfe) {
- LOG.error("The table " + hTableName + " not found ", tnfe);
+ LOGGER.error("The table " + hTableName + " not found ", tnfe);
throw new RuntimeException("HBase table '" + hTableName + "' not
found.", tnfe);
} catch (IOException ioe) {
- LOG.error("Exception while creating connection to HBase.", ioe);
+ LOGGER.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.",
ioe);
}
- LOG.info("end open.");
+ LOGGER.info("End hbase sink function open.");
}
private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration()
throws IOException {
@@ -205,7 +221,7 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
// do validation: check key option(s) in final runtime configuration
if
(StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM)))
{
- LOG.error(
+ LOGGER.error(
"Can not connect to HBase without {} configuration",
HConstants.ZOOKEEPER_QUORUM);
throw new IOException(
@@ -220,29 +236,51 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
private void checkErrorAndRethrow() {
Throwable cause = failureThrowable.get();
if (cause != null) {
- LOG.error("An error occurred in HBaseSink.", cause);
- failureThrowable.compareAndSet(cause, null);
+ LOGGER.error("An error occurred in HBaseSink.", cause);
+ throw new RuntimeException(cause);
}
}
- @SuppressWarnings("rawtypes")
@Override
public void invoke(T value, Context context) {
checkErrorAndRethrow();
- try {
- RowData rowData = (RowData) value;
- if (RowKind.UPDATE_BEFORE != rowData.getRowKind()) {
- mutator.mutate(mutationConverter.convertToMutation(value));
- }
- rowSize++;
- dataSize = dataSize +
value.toString().getBytes(StandardCharsets.UTF_8).length;
- } catch (Exception e) {
- if (sinkMetricData != null) {
+ RowData rowData = (RowData) value;
+ if (RowKind.UPDATE_BEFORE != rowData.getRowKind()) {
+ Mutation mutation = null;
+ try {
+ mutation =
Preconditions.checkNotNull(mutationConverter.convertToMutation(value));
+ rowSize++;
+ dataSize = dataSize +
value.toString().getBytes(StandardCharsets.UTF_8).length;
+ } catch (Exception e) {
+ LOGGER.error("Convert to mutation error", e);
+ if (!dirtyOptions.ignoreDirty()) {
+ throw new RuntimeException(e);
+ }
sinkMetricData.invokeDirty(1,
value.toString().getBytes(StandardCharsets.UTF_8).length);
+ if (dirtySink != null) {
+ DirtyData.Builder<Object> builder = DirtyData.builder();
+ try {
+ builder.setData(rowData)
+ .setDirtyType(DirtyType.UNDEFINED)
+ .setLabels(dirtyOptions.getLabels())
+ .setLogTag(dirtyOptions.getLogTag())
+ .setDirtyMessage(e.getMessage())
+ .setIdentifier(dirtyOptions.getIdentifier());
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOGGER.warn("Dirty sink failed", ex);
+ }
+ }
+ }
+ try {
+ mutator.mutate(mutation);
+ } catch (Exception e) {
+ failureThrowable.compareAndSet(null, e);
}
- failureThrowable.compareAndSet(null, e);
}
-
// flush when the buffer number of mutations greater than the
configured max size.
if (bufferFlushMaxMutations > 0
&& numPendingRequests.incrementAndGet() >=
bufferFlushMaxMutations) {
@@ -258,10 +296,6 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
}
resetStateAfterFlush();
} catch (Exception e) {
- if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(rowSize, dataSize);
- }
- resetStateAfterFlush();
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, e);
@@ -288,7 +322,7 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
try {
mutator.close();
} catch (IOException e) {
- LOG.warn("Exception occurs while closing HBase
BufferedMutator.", e);
+ LOGGER.warn("Exception occurs while closing HBase
BufferedMutator.", e);
}
this.mutator = null;
}
@@ -297,7 +331,7 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
try {
connection.close();
} catch (IOException e) {
- LOG.warn("Exception occurs while closing HBase Connection.",
e);
+ LOGGER.warn("Exception occurs while closing HBase
Connection.", e);
}
this.connection = null;
}
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
index 368976b9d..b39be7c48 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
@@ -40,6 +40,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -71,6 +72,13 @@ public class HbaseLoadFlinkSqlParseTest extends
AbstractTestBase {
* @return hbase load node
*/
private HbaseLoadNode buildHbaseLoadNode() {
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "log");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=default&table=mytable");
return new HbaseLoadNode("2", "test_hbase",
Arrays.asList(new FieldInfo("cf:age", new LongFormatInfo()),
new FieldInfo("cf:name",
new StringFormatInfo())),
@@ -78,7 +86,7 @@ public class HbaseLoadFlinkSqlParseTest extends
AbstractTestBase {
new FieldInfo("cf:age", new LongFormatInfo())),
new FieldRelation(new FieldInfo("name", new
StringFormatInfo()),
new FieldInfo("cf:name", new
StringFormatInfo()))),
- null, null, 1, null, "mytable",
+ null, null, 1, properties, "mytable",
"default",
"localhost:2181", "MD5(`name`)", null, null, null, null);
}