This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new bad24da [feature]add arrow type for streamload (#265)
bad24da is described below
commit bad24da688fce7001825f0e2d8b0385176c5423a
Author: wuwenchi <[email protected]>
AuthorDate: Wed Dec 27 11:03:34 2023 +0800
[feature]add arrow type for streamload (#265)
---
.github/workflows/build-extension.yml | 23 +++-
flink-doris-connector/build.sh | 4 +-
flink-doris-connector/pom.xml | 21 ++-
.../doris/flink/sink/batch/BatchRecordBuffer.java | 4 +-
.../flink/sink/batch/DorisBatchStreamLoad.java | 16 ++-
.../doris/flink/sink/batch/DorisBatchWriter.java | 29 ++--
.../doris/flink/sink/writer/DorisStreamLoad.java | 23 ++--
.../doris/flink/sink/writer/DorisWriter.java | 24 ++--
.../doris/flink/sink/writer/LoadConstants.java | 1 +
.../flink/sink/writer/serializer/DorisRecord.java | 3 +
.../writer/serializer/DorisRecordSerializer.java | 8 ++
.../sink/writer/serializer/RowDataSerializer.java | 80 ++++++++++-
.../runtime/arrow/serializers/ArrowSerializer.java | 153 +++++++++++++++++++++
.../flink/sink/writer/TestRowDataSerializer.java | 37 +++++
14 files changed, 376 insertions(+), 50 deletions(-)
diff --git a/.github/workflows/build-extension.yml
b/.github/workflows/build-extension.yml
index 2038d21..7259bb4 100644
--- a/.github/workflows/build-extension.yml
+++ b/.github/workflows/build-extension.yml
@@ -42,5 +42,26 @@ jobs:
run: |
cd flink-doris-connector && mvn clean package \
-Dflink.version=1.15.0 \
- -Dflink.minor.version=1.15
+ -Dflink.minor.version=1.15 \
+ -Dflink.python.id=flink-python_2.12
+ - name: Build flink connector 1.16
+ run: |
+ cd flink-doris-connector && mvn clean package \
+ -Dflink.version=1.16.0 \
+ -Dflink.minor.version=1.16 \
+ -Dflink.python.id=flink-python
+
+ - name: Build flink connector 1.17
+ run: |
+ cd flink-doris-connector && mvn clean package \
+ -Dflink.version=1.17.0 \
+ -Dflink.minor.version=1.17 \
+ -Dflink.python.id=flink-python
+
+ - name: Build flink connector 1.18
+ run: |
+ cd flink-doris-connector && mvn clean package \
+ -Dflink.version=1.18.0 \
+ -Dflink.minor.version=1.18 \
+ -Dflink.python.id=flink-python
diff --git a/flink-doris-connector/build.sh b/flink-doris-connector/build.sh
index a646f58..1c67f2b 100755
--- a/flink-doris-connector/build.sh
+++ b/flink-doris-connector/build.sh
@@ -142,8 +142,10 @@ selectFlink() {
FLINK_VERSION=0
selectFlink
flinkVer=$?
+FLINK_PYTHON_ID="flink-python"
if [ ${flinkVer} -eq 1 ]; then
FLINK_VERSION="1.15.0"
+ FLINK_PYTHON_ID="flink-python_2.12"
elif [ ${flinkVer} -eq 2 ]; then
FLINK_VERSION="1.16.0"
elif [ ${flinkVer} -eq 3 ]; then
@@ -160,7 +162,7 @@ FLINK_MAJOR_VERSION=0
echo_g " flink version: ${FLINK_VERSION}, major version:
${FLINK_MAJOR_VERSION}"
echo_g " build starting..."
-${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION}
-Dflink.major.version=${FLINK_MAJOR_VERSION} "$@"
+${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION}
-Dflink.major.version=${FLINK_MAJOR_VERSION}
-Dflink.python.id=${FLINK_PYTHON_ID} "$@"
EXIT_CODE=$?
if [ $EXIT_CODE -eq 0 ]; then
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index a54562e..aedb352 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -71,8 +71,9 @@ under the License.
<flink.version>1.18.0</flink.version>
<flink.major.version>1.18</flink.major.version>
<flink.sql.cdc.version>2.4.2</flink.sql.cdc.version>
+ <flink.python.id>flink-python</flink.python.id>
<libthrift.version>0.16.0</libthrift.version>
- <arrow.version>5.0.0</arrow.version>
+ <arrow.version>13.0.0</arrow.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
<maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
@@ -84,7 +85,6 @@ under the License.
<spotless.version>2.4.2</spotless.version>
<httpcomponents.version>4.5.13</httpcomponents.version>
<commons-codec.version>1.15</commons-codec.version>
- <netty.version>4.1.77.Final</netty.version>
<fasterxml.version>2.13.3</fasterxml.version>
<guava.version>31.1-jre</guava.version>
<slf4j.version>1.7.25</slf4j.version>
@@ -137,6 +137,13 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${flink.python.id}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
@@ -195,19 +202,9 @@ under the License.
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty-common</artifactId>
- </exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-common</artifactId>
- <version>${netty.version}</version>
- </dependency>
-
<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
index 297cb18..df40e7a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
@@ -57,7 +57,7 @@ public class BatchRecordBuffer {
ensureCapacity(record.length);
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
- } else {
+ } else if (lineDelimiter != null) {
this.buffer.put(this.lineDelimiter);
}
this.buffer.put(record);
@@ -67,7 +67,7 @@ public class BatchRecordBuffer {
@VisibleForTesting
public void ensureCapacity(int length) {
- int lineDelimiterSize = this.lineDelimiter.length;
+ int lineDelimiterSize = this.lineDelimiter == null ? 0 :
this.lineDelimiter.length;
if (buffer.remaining() - lineDelimiterSize >= length) {
return;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 7ca0cda..f32ce2c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -61,6 +61,9 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
+import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
+import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
+import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
@@ -105,10 +108,15 @@ public class DorisBatchStreamLoad implements Serializable
{
this.password = dorisOptions.getPassword();
this.loadProps = executionOptions.getStreamLoadProp();
this.labelGenerator = labelGenerator;
- this.lineDelimiter =
- EscapeHandler.escapeString(
- loadProps.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT))
- .getBytes();
+ if (loadProps.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
+ this.lineDelimiter = null;
+ } else {
+ this.lineDelimiter =
+ EscapeHandler.escapeString(
+ loadProps.getProperty(
+ LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT))
+ .getBytes();
+ }
this.executionOptions = executionOptions;
this.flushQueue = new
LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index 43aff7f..6a6576c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -77,14 +77,15 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN>
{
this.dorisReadOptions = dorisReadOptions;
this.executionOptions = executionOptions;
this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs();
+ serializer.initial();
}
public void initializeLoad() throws IOException {
this.batchStreamLoad =
new DorisBatchStreamLoad(
dorisOptions, dorisReadOptions, executionOptions,
labelGenerator);
- // when uploading data in streaming mode,
- // we need to regularly detect whether there areexceptions.
+ // when uploading data in streaming mode, we need to regularly detect
whether there are
+ // exceptions.
scheduledExecutorService.scheduleWithFixedDelay(
this::intervalFlush, flushIntervalMs, flushIntervalMs,
TimeUnit.MILLISECONDS);
}
@@ -101,13 +102,24 @@ public class DorisBatchWriter<IN> implements
SinkWriter<IN> {
@Override
public void write(IN in, Context context) throws IOException,
InterruptedException {
checkFlushException();
- String db = this.database;
- String tbl = this.table;
- DorisRecord record = serializer.serialize(in);
+ writeOneDorisRecord(serializer.serialize(in));
+ }
+
+ @Override
+ public void flush(boolean flush) throws IOException, InterruptedException {
+ checkFlushException();
+ writeOneDorisRecord(serializer.flush());
+ LOG.info("checkpoint flush triggered.");
+ batchStreamLoad.flush(null, true);
+ }
+
+ public void writeOneDorisRecord(DorisRecord record) throws
InterruptedException {
if (record == null || record.getRow() == null) {
// ddl or value is null
return;
}
+ String db = this.database;
+ String tbl = this.table;
// multi table load
if (record.getTableIdentifier() != null) {
db = record.getDatabase();
@@ -116,13 +128,6 @@ public class DorisBatchWriter<IN> implements
SinkWriter<IN> {
batchStreamLoad.writeRecord(db, tbl, record.getRow());
}
- @Override
- public void flush(boolean flush) throws IOException, InterruptedException {
- checkFlushException();
- LOG.info("checkpoint flush triggered.");
- batchStreamLoad.flush(null, true);
- }
-
@Override
public void close() throws Exception {
LOG.info("DorisBatchWriter Close");
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 30ff365..b095eb9 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -54,6 +54,9 @@ import java.util.regex.Matcher;
import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
+import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
+import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
+import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
@@ -115,11 +118,15 @@ public class DorisStreamLoad implements Serializable {
executionOptions.getBufferSize(),
executionOptions.getBufferCount(),
executionOptions.isUseCache());
- lineDelimiter =
- EscapeHandler.escapeString(
- streamLoadProp.getProperty(
- LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT))
- .getBytes();
+ if (streamLoadProp.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
+ lineDelimiter = null;
+ } else {
+ lineDelimiter =
+ EscapeHandler.escapeString(
+ streamLoadProp.getProperty(
+ LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT))
+ .getBytes();
+ }
loadBatchFirstRecord = true;
}
@@ -157,8 +164,8 @@ public class DorisStreamLoad implements Serializable {
LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix,
chkID);
while (true) {
try {
- // TODO: According to label abort txn. Currently,
- // it can only be aborted based on txnid,
+ // TODO: According to label abort txn. Currently, it can only
be aborted based on
+ // txnid,
// so we must first request a streamload based on the label
to get the txnid.
String label = labelGenerator.generateTableLabel(startChkID);
HttpPutBuilder builder = new HttpPutBuilder();
@@ -218,7 +225,7 @@ public class DorisStreamLoad implements Serializable {
public void writeRecord(byte[] record) throws IOException {
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
- } else {
+ } else if (lineDelimiter != null) {
recordStream.write(lineDelimiter);
}
recordStream.write(record);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 3fec941..e8cc1ff 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -109,6 +109,7 @@ public class DorisWriter<IN>
this.globalLoading = false;
initializeLoad(state);
+ serializer.initial();
}
public void initializeLoad(Collection<DorisWriterState> state) {
@@ -123,8 +124,8 @@ public class DorisWriter<IN>
}
// get main work thread.
executorThread = Thread.currentThread();
- // when uploading data in streaming mode,
- // we need to regularly detect whether there are exceptions.
+ // when uploading data in streaming mode, we need to regularly detect
whether there are
+ // exceptions.
scheduledExecutorService.scheduleWithFixedDelay(
this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
}
@@ -167,14 +168,23 @@ public class DorisWriter<IN>
@Override
public void write(IN in, Context context) throws IOException {
checkLoadException();
- String tableKey = dorisOptions.getTableIdentifier();
+ writeOneDorisRecord(serializer.serialize(in));
+ }
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException,
InterruptedException {
+ writeOneDorisRecord(serializer.flush());
+ }
+
+ public void writeOneDorisRecord(DorisRecord record) throws IOException {
- DorisRecord record = serializer.serialize(in);
if (record == null || record.getRow() == null) {
// ddl or value is null
return;
}
+
// multi table load
+ String tableKey = dorisOptions.getTableIdentifier();
if (record.getTableIdentifier() != null) {
tableKey = record.getTableIdentifier();
}
@@ -191,11 +201,6 @@ public class DorisWriter<IN>
streamLoader.writeRecord(record.getRow());
}
- @Override
- public void flush(boolean flush) throws IOException, InterruptedException {
- // No action is triggered, everything is in the precommit method
- }
-
@Override
public Collection<DorisCommittable> prepareCommit() throws IOException,
InterruptedException {
// Verify whether data is written during a checkpoint
@@ -369,5 +374,6 @@ public class DorisWriter<IN>
dorisStreamLoad.close();
}
}
+ serializer.close();
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
index 7b0d1d0..2e5d29a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
@@ -27,6 +27,7 @@ public class LoadConstants {
public static final String FORMAT_KEY = "format";
public static final String JSON = "json";
public static final String CSV = "csv";
+ public static final String ARROW = "arrow";
public static final String NULL_VALUE = "\\N";
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String READ_JSON_BY_LINE = "read_json_by_line";
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
index d15f07c..6a5bdde 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
@@ -20,6 +20,9 @@ package org.apache.doris.flink.sink.writer.serializer;
import java.io.Serializable;
public class DorisRecord implements Serializable {
+
+ public static DorisRecord empty = new DorisRecord();
+
private String database;
private String table;
private byte[] row;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
index c1135fa..5582ea5 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
@@ -35,4 +35,12 @@ public interface DorisRecordSerializer<T> extends
Serializable {
* @throws IOException
*/
DorisRecord serialize(T record) throws IOException;
+
+ default void initial() {}
+
+ default DorisRecord flush() {
+ return DorisRecord.empty;
+ }
+
+ default void close() throws Exception {}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
index 2615fb9..f7d9874 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
@@ -18,20 +18,29 @@
package org.apache.doris.flink.sink.writer.serializer;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
import org.apache.doris.flink.sink.EscapeHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.StringJoiner;
+import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static
org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
@@ -39,12 +48,18 @@ import static
org.apache.doris.flink.sink.writer.LoadConstants.NULL_VALUE;
/** Serializer for RowData. */
public class RowDataSerializer implements DorisRecordSerializer<RowData> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RowDataSerializer.class);
String[] fieldNames;
String type;
private ObjectMapper objectMapper;
private final String fieldDelimiter;
private final boolean enableDelete;
private final DorisRowConverter rowConverter;
+ private ArrowSerializer arrowSerializer;
+ ByteArrayOutputStream outputStream;
+ private final int arrowBatchCnt = 1000;
+ private int arrowWriteCnt = 0;
+ private final DataType[] dataTypes;
private RowDataSerializer(
String[] fieldNames,
@@ -59,9 +74,25 @@ public class RowDataSerializer implements
DorisRecordSerializer<RowData> {
if (JSON.equals(type)) {
objectMapper = new ObjectMapper();
}
+ this.dataTypes = dataTypes;
this.rowConverter = new
DorisRowConverter().setExternalConverter(dataTypes);
}
+ @Override
+ public void initial() {
+ if (ARROW.equals(type)) {
+ LogicalType[] logicalTypes =
TypeConversions.fromDataToLogicalType(dataTypes);
+ RowType rowType = RowType.of(logicalTypes, fieldNames);
+ arrowSerializer = new ArrowSerializer(rowType, rowType);
+ outputStream = new ByteArrayOutputStream();
+ try {
+ arrowSerializer.open(new ByteArrayInputStream(new byte[0]),
outputStream);
+ } catch (Exception e) {
+ throw new RuntimeException("failed to open arrow serializer:",
e);
+ }
+ }
+ }
+
@Override
public DorisRecord serialize(RowData record) throws IOException {
int maxIndex = Math.min(record.getArity(), fieldNames.length);
@@ -70,12 +101,54 @@ public class RowDataSerializer implements
DorisRecordSerializer<RowData> {
valString = buildJsonString(record, maxIndex);
} else if (CSV.equals(type)) {
valString = buildCSVString(record, maxIndex);
+ } else if (ARROW.equals(type)) {
+ arrowWriteCnt += 1;
+ arrowSerializer.write(record);
+ if (arrowWriteCnt < arrowBatchCnt) {
+ return DorisRecord.empty;
+ }
+ return arrowToDorisRecord();
} else {
throw new IllegalArgumentException("The type " + type + " is not
supported!");
}
return DorisRecord.of(valString.getBytes(StandardCharsets.UTF_8));
}
+ @Override
+ public DorisRecord flush() {
+ if (JSON.equals(type) || CSV.equals(type)) {
+ return DorisRecord.empty;
+ } else if (ARROW.equals(type)) {
+ return arrowToDorisRecord();
+ } else {
+ throw new IllegalArgumentException("The type " + type + " is not
supported!");
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (ARROW.equals(type)) {
+ arrowSerializer.close();
+ }
+ }
+
+ public DorisRecord arrowToDorisRecord() {
+ if (arrowWriteCnt == 0) {
+ return DorisRecord.empty;
+ }
+ arrowWriteCnt = 0;
+ try {
+ arrowSerializer.finishCurrentBatch();
+ byte[] bytes = outputStream.toByteArray();
+ outputStream.reset();
+ arrowSerializer.resetWriter();
+ return DorisRecord.of(bytes);
+ } catch (Exception e) {
+ LOG.error("Failed to convert arrow batch:", e);
+ }
+ return DorisRecord.empty;
+ }
+
public String buildJsonString(RowData record, int maxIndex) throws
IOException {
int fieldIndex = 0;
Map<String, String> valueMap = new HashMap<>();
@@ -155,9 +228,14 @@ public class RowDataSerializer implements
DorisRecordSerializer<RowData> {
public RowDataSerializer build() {
Preconditions.checkState(
- CSV.equals(type) && fieldDelimiter != null ||
JSON.equals(type));
+ CSV.equals(type) && fieldDelimiter != null
+ || JSON.equals(type)
+ || ARROW.equals(type));
Preconditions.checkNotNull(dataTypes);
Preconditions.checkNotNull(fieldNames);
+ if (ARROW.equals(type)) {
+ Preconditions.checkArgument(!deletable);
+ }
return new RowDataSerializer(fieldNames, dataTypes, type,
fieldDelimiter, deletable);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
b/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
new file mode 100644
index 0000000..29f809f
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow.serializers;
+
+import
org.apache.flink.api.python.shaded.org.apache.arrow.memory.BufferAllocator;
+import
org.apache.flink.api.python.shaded.org.apache.arrow.memory.RootAllocator;
+import
org.apache.flink.api.python.shaded.org.apache.arrow.vector.VectorSchemaRoot;
+import
org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ArrowStreamReader;
+import
org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * this code is copied from flink-python, and modified finishCurrentBatch to
add end operation.
+ *
+ * <p>The base class ArrowSerializer which will serialize/deserialize RowType
data to/from arrow
+ * bytes.
+ */
+public final class ArrowSerializer {
+
+ static {
+ ArrowUtils.checkArrowUsable();
+ }
+
+ /** The input RowType. */
+ protected final RowType inputType;
+
+ /** The output RowType. */
+ protected final RowType outputType;
+
+ /** Allocator which is used for byte buffer allocation. */
+ private transient BufferAllocator allocator;
+
+ /** Reader which is responsible for deserialize the Arrow format data to
the Flink rows. */
+ private transient ArrowReader arrowReader;
+
+ /**
+ * Reader which is responsible for convert the execution result from byte
array to arrow format.
+ */
+ private transient ArrowStreamReader arrowStreamReader;
+
+ /**
+ * Container that holds a set of vectors for the input elements to be sent
to the Python worker.
+ */
+ transient VectorSchemaRoot rootWriter;
+
+ /** Writer which is responsible for serialize the input elements to arrow
format. */
+ private transient ArrowWriter<RowData> arrowWriter;
+
+ /** Writer which is responsible for convert the arrow format data into
byte array. */
+ private transient ArrowStreamWriter arrowStreamWriter;
+
+ /** Reusable InputStream used to holding the execution results to be
deserialized. */
+ private transient InputStream bais;
+
+ /** Reusable OutputStream used to holding the serialized input elements. */
+ private transient OutputStream baos;
+
+ public ArrowSerializer(RowType inputType, RowType outputType) {
+ this.inputType = inputType;
+ this.outputType = outputType;
+ }
+
+ public void open(InputStream bais, OutputStream baos) throws Exception {
+ this.bais = bais;
+ this.baos = baos;
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ arrowStreamReader = new ArrowStreamReader(bais, allocator);
+
+ rootWriter =
VectorSchemaRoot.create(ArrowUtils.toArrowSchema(inputType), allocator);
+ arrowWriter = createArrowWriter();
+ arrowStreamWriter = new ArrowStreamWriter(rootWriter, null, baos);
+ arrowStreamWriter.start();
+ }
+
+ public int load() throws IOException {
+ arrowStreamReader.loadNextBatch();
+ VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
+ if (arrowReader == null) {
+ arrowReader = createArrowReader(root);
+ }
+ return root.getRowCount();
+ }
+
+ public RowData read(int i) {
+ return arrowReader.read(i);
+ }
+
+ public void write(RowData element) {
+ arrowWriter.write(element);
+ }
+
+ public void close() throws Exception {
+ arrowStreamWriter.end();
+ arrowStreamReader.close();
+ rootWriter.close();
+ allocator.close();
+ }
+
+ /** Creates an {@link ArrowWriter}. */
+ public ArrowWriter<RowData> createArrowWriter() {
+ return ArrowUtils.createRowDataArrowWriter(rootWriter, inputType);
+ }
+
+ public ArrowReader createArrowReader(VectorSchemaRoot root) {
+ return ArrowUtils.createArrowReader(root, outputType);
+ }
+
+ /**
+ * Forces to finish the processing of the current batch of elements. It
will serialize the batch
+ * of elements into one arrow batch.
+ */
+ public void finishCurrentBatch() throws Exception {
+ arrowWriter.finish();
+ arrowStreamWriter.writeBatch();
+ arrowStreamWriter.end();
+ arrowWriter.reset();
+ }
+
+ public void resetReader() throws IOException {
+ arrowReader = null;
+ arrowStreamReader.close();
+ arrowStreamReader = new ArrowStreamReader(bais, allocator);
+ }
+
+ public void resetWriter() throws IOException {
+ arrowStreamWriter = new ArrowStreamWriter(rootWriter, null, baos);
+ arrowStreamWriter.start();
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
index d1028fe..84e2971 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
@@ -19,8 +19,13 @@ package org.apache.doris.flink.sink.writer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowKind;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -30,6 +35,8 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@@ -132,4 +139,34 @@ public class TestRowDataSerializer {
Assert.assertEquals("1", serializer.parseDeleteSign(RowKind.DELETE));
Assert.assertEquals("1",
serializer.parseDeleteSign(RowKind.UPDATE_BEFORE));
}
+
+ @Test
+ public void testArrowType() throws Exception {
+ RowDataSerializer serializer =
+ RowDataSerializer.builder()
+ .setFieldNames(fieldNames)
+ .setFieldType(dataTypes)
+ .setType("arrow")
+ .enableDelete(false)
+ .build();
+
+ // write data to binary
+ serializer.initial();
+ serializer.serialize(rowData);
+ byte[] serializedValue = serializer.flush().getRow();
+
+ // read data from binary
+ LogicalType[] logicalTypes =
TypeConversions.fromDataToLogicalType(dataTypes);
+ RowType rowType = RowType.of(logicalTypes, fieldNames);
+ ArrowSerializer arrowSerializer = new ArrowSerializer(rowType,
rowType);
+ ByteArrayInputStream input = new ByteArrayInputStream(serializedValue);
+ arrowSerializer.open(input, new ByteArrayOutputStream(0));
+ int cnt = arrowSerializer.load();
+ RowData data = arrowSerializer.read(0);
+
+ Assert.assertEquals(1, cnt);
+ Assert.assertEquals(3, data.getInt(0));
+ Assert.assertEquals("test", data.getString(1).toString());
+ Assert.assertEquals(60.2, data.getDouble(2), 0.001);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]