This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cc9a7bda0 [INLONG-6792][Sort] Supports dirty data side-output for 
hbase sink (#6793)
cc9a7bda0 is described below

commit cc9a7bda02fc0fb18691430188b79fdee569d6ab
Author: yunqingmoswu <[email protected]>
AuthorDate: Fri Dec 9 15:54:01 2022 +0800

    [INLONG-6792][Sort] Supports dirty data side-output for hbase sink (#6793)
---
 .../sort/hbase/HBase2DynamicTableFactory.java      | 12 ++-
 .../sort/hbase/sink/HBaseDynamicTableSink.java     | 17 ++++-
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  | 88 +++++++++++++++-------
 .../sort/parser/HbaseLoadFlinkSqlParseTest.java    | 10 ++-
 4 files changed, 93 insertions(+), 34 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
 
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
index 54566ebb0..12b8f3e04 100644
--- 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
@@ -31,6 +31,9 @@ import 
org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
 import org.apache.inlong.sort.hbase.sink.HBaseDynamicTableSink;
 
 import java.util.HashSet;
@@ -55,6 +58,7 @@ import static 
org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseWrit
 import static 
org.apache.flink.connector.hbase.options.HBaseOptions.validatePrimaryKey;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
 import static 
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
@@ -91,7 +95,7 @@ public class HBase2DynamicTableFactory
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
         TableFactoryHelper helper = createTableFactoryHelper(this, context);
-        helper.validateExcept(PROPERTIES_PREFIX);
+        helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX);
 
         final ReadableConfig tableOptions = helper.getOptions();
 
@@ -107,9 +111,11 @@ public class HBase2DynamicTableFactory
         HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(tableSchema);
         String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
         String inlongAudit = tableOptions.get(INLONG_AUDIT);
-
+        final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(tableOptions);
+        final DirtySink<Object> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
         return new HBaseDynamicTableSink(
-                tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, 
nullStringLiteral, inlongMetric, inlongAudit);
+                tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, 
nullStringLiteral,
+                inlongMetric, inlongAudit, dirtyOptions, dirtySink);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
 
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
index 0f68fd811..2a4542706 100644
--- 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
+++ 
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseDynamicTableSink.java
@@ -28,6 +28,10 @@ import 
org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+
+import javax.annotation.Nullable;
 
 /** HBase table sink implementation. */
 @Internal
@@ -40,6 +44,8 @@ public class HBaseDynamicTableSink implements 
DynamicTableSink {
     private final String nullStringLiteral;
     private final String inlongMetric;
     private final String inlongAudit;
+    private final DirtyOptions dirtyOptions;
+    private @Nullable final DirtySink<Object> dirtySink;
 
     public HBaseDynamicTableSink(
             String tableName,
@@ -48,7 +54,9 @@ public class HBaseDynamicTableSink implements 
DynamicTableSink {
             HBaseWriteOptions writeOptions,
             String nullStringLiteral,
             String inlongMetric,
-            String inlongAudit) {
+            String inlongAudit,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         this.tableName = tableName;
         this.hbaseTableSchema = hbaseTableSchema;
         this.hbaseConf = hbaseConf;
@@ -56,6 +64,8 @@ public class HBaseDynamicTableSink implements 
DynamicTableSink {
         this.nullStringLiteral = nullStringLiteral;
         this.inlongMetric = inlongMetric;
         this.inlongAudit = inlongAudit;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
     @Override
@@ -68,7 +78,7 @@ public class HBaseDynamicTableSink implements 
DynamicTableSink {
                         writeOptions.getBufferFlushMaxSizeInBytes(),
                         writeOptions.getBufferFlushMaxRows(),
                         writeOptions.getBufferFlushIntervalMillis(),
-                        inlongMetric, inlongAudit);
+                        inlongMetric, inlongAudit, dirtyOptions, dirtySink);
         return SinkFunctionProvider.of(sinkFunction, 
writeOptions.getParallelism());
     }
 
@@ -80,7 +90,8 @@ public class HBaseDynamicTableSink implements 
DynamicTableSink {
     @Override
     public DynamicTableSink copy() {
         return new HBaseDynamicTableSink(
-                tableName, hbaseTableSchema, hbaseConf, writeOptions, 
nullStringLiteral, inlongMetric, inlongAudit);
+                tableName, hbaseTableSchema, hbaseConf, writeOptions,
+                nullStringLiteral, inlongMetric, inlongAudit, dirtyOptions, 
dirtySink);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
 
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index e69243282..3724047b3 100644
--- 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
@@ -42,7 +43,12 @@ import org.apache.hadoop.hbase.client.BufferedMutator;
 import org.apache.hadoop.hbase.client.BufferedMutatorParams;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
@@ -51,6 +57,7 @@ import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Executors;
@@ -81,7 +88,7 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
             BufferedMutator.ExceptionListener {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSinkFunction.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HBaseSinkFunction.class);
 
     private final String hTableName;
     private final byte[] serializedConfig;
@@ -114,6 +121,8 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
     private transient volatile boolean closed = false;
     private Long dataSize = 0L;
     private Long rowSize = 0L;
+    private final DirtyOptions dirtyOptions;
+    private @Nullable final DirtySink<Object> dirtySink;
 
     public HBaseSinkFunction(
             String hTableName,
@@ -123,7 +132,9 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
             long bufferFlushMaxMutations,
             long bufferFlushIntervalMillis,
             String inlongMetric,
-            String inlongAudit) {
+            String inlongAudit,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         this.hTableName = hTableName;
         // Configuration is not serializable
         this.serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(conf);
@@ -133,11 +144,13 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
         this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
         this.inlongMetric = inlongMetric;
         this.inlongAudit = inlongAudit;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
     @Override
     public void open(Configuration parameters) throws Exception {
-        LOG.info("start open ...");
+        LOGGER.info("Start hbase sink function open ...");
         org.apache.hadoop.conf.Configuration config = 
prepareRuntimeConfiguration();
         try {
             this.runtimeContext = getRuntimeContext();
@@ -153,6 +166,9 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
             if (metricOption != null) {
                 sinkMetricData = new SinkMetricData(metricOption, 
runtimeContext.getMetricGroup());
             }
+            if (dirtySink != null) {
+                dirtySink.open(parameters);
+            }
             this.mutationConverter.open();
             this.numPendingRequests = new AtomicLong(0);
 
@@ -184,13 +200,13 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
                                 TimeUnit.MILLISECONDS);
             }
         } catch (TableNotFoundException tnfe) {
-            LOG.error("The table " + hTableName + " not found ", tnfe);
+            LOGGER.error("The table " + hTableName + " not found ", tnfe);
             throw new RuntimeException("HBase table '" + hTableName + "' not 
found.", tnfe);
         } catch (IOException ioe) {
-            LOG.error("Exception while creating connection to HBase.", ioe);
+            LOGGER.error("Exception while creating connection to HBase.", ioe);
             throw new RuntimeException("Cannot create connection to HBase.", 
ioe);
         }
-        LOG.info("end open.");
+        LOGGER.info("End hbase sink function open.");
     }
 
     private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() 
throws IOException {
@@ -205,7 +221,7 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
 
         // do validation: check key option(s) in final runtime configuration
         if 
(StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM)))
 {
-            LOG.error(
+            LOGGER.error(
                     "Can not connect to HBase without {} configuration",
                     HConstants.ZOOKEEPER_QUORUM);
             throw new IOException(
@@ -220,29 +236,51 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
     private void checkErrorAndRethrow() {
         Throwable cause = failureThrowable.get();
         if (cause != null) {
-            LOG.error("An error occurred in HBaseSink.", cause);
-            failureThrowable.compareAndSet(cause, null);
+            LOGGER.error("An error occurred in HBaseSink.", cause);
+            throw new RuntimeException(cause);
         }
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
     public void invoke(T value, Context context) {
         checkErrorAndRethrow();
-        try {
-            RowData rowData = (RowData) value;
-            if (RowKind.UPDATE_BEFORE != rowData.getRowKind()) {
-                mutator.mutate(mutationConverter.convertToMutation(value));
-            }
-            rowSize++;
-            dataSize = dataSize + 
value.toString().getBytes(StandardCharsets.UTF_8).length;
-        } catch (Exception e) {
-            if (sinkMetricData != null) {
+        RowData rowData = (RowData) value;
+        if (RowKind.UPDATE_BEFORE != rowData.getRowKind()) {
+            Mutation mutation = null;
+            try {
+                mutation = 
Preconditions.checkNotNull(mutationConverter.convertToMutation(value));
+                rowSize++;
+                dataSize = dataSize + 
value.toString().getBytes(StandardCharsets.UTF_8).length;
+            } catch (Exception e) {
+                LOGGER.error("Convert to mutation error", e);
+                if (!dirtyOptions.ignoreDirty()) {
+                    throw new RuntimeException(e);
+                }
                 sinkMetricData.invokeDirty(1, 
value.toString().getBytes(StandardCharsets.UTF_8).length);
+                if (dirtySink != null) {
+                    DirtyData.Builder<Object> builder = DirtyData.builder();
+                    try {
+                        builder.setData(rowData)
+                                .setDirtyType(DirtyType.UNDEFINED)
+                                .setLabels(dirtyOptions.getLabels())
+                                .setLogTag(dirtyOptions.getLogTag())
+                                .setDirtyMessage(e.getMessage())
+                                .setIdentifier(dirtyOptions.getIdentifier());
+                        dirtySink.invoke(builder.build());
+                    } catch (Exception ex) {
+                        if (!dirtyOptions.ignoreSideOutputErrors()) {
+                            throw new RuntimeException(ex);
+                        }
+                        LOGGER.warn("Dirty sink failed", ex);
+                    }
+                }
+            }
+            try {
+                mutator.mutate(mutation);
+            } catch (Exception e) {
+                failureThrowable.compareAndSet(null, e);
             }
-            failureThrowable.compareAndSet(null, e);
         }
-
         // flush when the buffer number of mutations greater than the 
configured max size.
         if (bufferFlushMaxMutations > 0
                 && numPendingRequests.incrementAndGet() >= 
bufferFlushMaxMutations) {
@@ -258,10 +296,6 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
             }
             resetStateAfterFlush();
         } catch (Exception e) {
-            if (sinkMetricData != null) {
-                sinkMetricData.invokeDirty(rowSize, dataSize);
-            }
-            resetStateAfterFlush();
             // fail the sink and skip the rest of the items
             // if the failure handler decides to throw an exception
             failureThrowable.compareAndSet(null, e);
@@ -288,7 +322,7 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
             try {
                 mutator.close();
             } catch (IOException e) {
-                LOG.warn("Exception occurs while closing HBase 
BufferedMutator.", e);
+                LOGGER.warn("Exception occurs while closing HBase 
BufferedMutator.", e);
             }
             this.mutator = null;
         }
@@ -297,7 +331,7 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
             try {
                 connection.close();
             } catch (IOException e) {
-                LOG.warn("Exception occurs while closing HBase Connection.", 
e);
+                LOGGER.warn("Exception occurs while closing HBase 
Connection.", e);
             }
             this.connection = null;
         }
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
index 368976b9d..b39be7c48 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
@@ -40,6 +40,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -71,6 +72,13 @@ public class HbaseLoadFlinkSqlParseTest extends 
AbstractTestBase {
      * @return hbase load node
      */
     private HbaseLoadNode buildHbaseLoadNode() {
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "log");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=default&table=mytable");
         return new HbaseLoadNode("2", "test_hbase",
                 Arrays.asList(new FieldInfo("cf:age", new LongFormatInfo()), 
new FieldInfo("cf:name",
                         new StringFormatInfo())),
@@ -78,7 +86,7 @@ public class HbaseLoadFlinkSqlParseTest extends 
AbstractTestBase {
                         new FieldInfo("cf:age", new LongFormatInfo())),
                         new FieldRelation(new FieldInfo("name", new 
StringFormatInfo()),
                                 new FieldInfo("cf:name", new 
StringFormatInfo()))),
-                null, null, 1, null, "mytable",
+                null, null, 1, properties, "mytable",
                 "default",
                 "localhost:2181", "MD5(`name`)", null, null, null, null);
     }

Reply via email to