This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new a355ca9 [refactor] sink api refactor by FLIP-191 (#213)
a355ca9 is described below
commit a355ca9675181aa6dcd53017d09852c18870f44c
Author: wudi <[email protected]>
AuthorDate: Tue Oct 24 10:56:53 2023 +0800
[refactor] sink api refactor by FLIP-191 (#213)
---
.../org/apache/doris/flink/sink/DorisSink.java | 41 +++++++-------
.../doris/flink/sink/committer/DorisCommitter.java | 22 ++++----
.../doris/flink/sink/writer/DorisStreamLoad.java | 2 +
.../doris/flink/sink/writer/DorisWriter.java | 27 ++++++---
.../doris/flink/table/DorisDynamicTableSink.java | 3 +-
.../flink/sink/committer/MockCommitRequest.java | 64 ++++++++++++++++++++++
.../flink/sink/committer/TestDorisCommitter.java | 12 ++--
.../doris/flink/sink/writer/TestDorisWriter.java | 16 +++---
8 files changed, 132 insertions(+), 55 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index d1aee44..bc2d45c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -24,27 +24,28 @@ import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.committer.DorisCommitter;
import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
import org.apache.doris.flink.sink.writer.DorisWriter;
-import org.apache.flink.api.connector.sink.Committer;
import org.apache.doris.flink.sink.writer.DorisWriterState;
import org.apache.doris.flink.sink.writer.DorisWriterStateSerializer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
+import java.util.Collection;
+import java.util.Collections;
/**
* Load data into Doris based on 2PC.
* see {@link DorisWriter} and {@link DorisCommitter}.
* @param <IN> type of record.
*/
-public class DorisSink<IN> implements Sink<IN, DorisCommittable,
DorisWriterState, DorisCommittable> {
+public class DorisSink<IN>
+ implements StatefulSink<IN, DorisWriterState>,
+ TwoPhaseCommittingSink<IN, DorisCommittable>{
private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class);
private final DorisOptions dorisOptions;
@@ -75,36 +76,32 @@ public class DorisSink<IN> implements Sink<IN,
DorisCommittable, DorisWriterStat
}
@Override
- public SinkWriter<IN, DorisCommittable, DorisWriterState>
createWriter(InitContext initContext, List<DorisWriterState> state) throws
IOException {
- DorisWriter<IN> dorisWriter = new DorisWriter<IN>(initContext, state,
serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions);
- dorisWriter.initializeLoad(state);
+ public DorisWriter<IN> createWriter(InitContext initContext) throws
IOException {
+ DorisWriter<IN> dorisWriter = new DorisWriter<>(initContext,
Collections.emptyList(), serializer, dorisOptions, dorisReadOptions,
dorisExecutionOptions);
return dorisWriter;
}
@Override
- public Optional<SimpleVersionedSerializer<DorisWriterState>>
getWriterStateSerializer() {
- return Optional.of(new DorisWriterStateSerializer());
+ public Committer<DorisCommittable> createCommitter() throws IOException {
+ return new DorisCommitter(dorisOptions, dorisReadOptions,
dorisExecutionOptions.getMaxRetries());
}
@Override
- public Optional<Committer<DorisCommittable>> createCommitter() throws
IOException {
- return Optional.of(new DorisCommitter(dorisOptions, dorisReadOptions,
dorisExecutionOptions.getMaxRetries()));
+ public DorisWriter<IN> restoreWriter(InitContext initContext,
Collection<DorisWriterState> recoveredState) throws IOException {
+ DorisWriter<IN> dorisWriter = new DorisWriter<>(initContext,
recoveredState, serializer, dorisOptions, dorisReadOptions,
dorisExecutionOptions);
+ return dorisWriter;
}
@Override
- public Optional<GlobalCommitter<DorisCommittable, DorisCommittable>>
createGlobalCommitter() throws IOException {
- return Optional.empty();
+ public SimpleVersionedSerializer<DorisWriterState>
getWriterStateSerializer() {
+ return new DorisWriterStateSerializer();
}
@Override
- public Optional<SimpleVersionedSerializer<DorisCommittable>>
getCommittableSerializer() {
- return Optional.of(new DorisCommittableSerializer());
+ public SimpleVersionedSerializer<DorisCommittable>
getCommittableSerializer() {
+ return new DorisCommittableSerializer();
}
- @Override
- public Optional<SimpleVersionedSerializer<DorisCommittable>>
getGlobalCommittableSerializer() {
- return Optional.empty();
- }
public static <IN> Builder<IN> builder() {
return new Builder<>();
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index 0e19b0f..2a0fba0 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -29,7 +29,7 @@ import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.ResponseUtil;
-import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink2.Committer;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
@@ -38,10 +38,10 @@ import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.io.IOException;
-import java.util.Collections;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
@@ -49,7 +49,7 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
/**
* The committer to commit transaction.
*/
-public class DorisCommitter implements Committer<DorisCommittable> {
+public class DorisCommitter implements Committer<DorisCommittable>, Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(DorisCommitter.class);
private static final String commitPattern =
"http://%s/api/%s/_stream_load_2pc";
private final CloseableHttpClient httpClient;
@@ -75,11 +75,10 @@ public class DorisCommitter implements
Committer<DorisCommittable> {
}
@Override
- public List<DorisCommittable> commit(List<DorisCommittable>
committableList) throws IOException {
- for (DorisCommittable committable : committableList) {
- commitTransaction(committable);
+ public void commit(Collection<CommitRequest<DorisCommittable>> requests)
throws IOException, InterruptedException {
+ for (CommitRequest<DorisCommittable> request: requests) {
+ commitTransaction(request.getCommittable());
}
- return Collections.emptyList();
}
private void commitTransaction(DorisCommittable committable) throws
IOException {
@@ -133,9 +132,12 @@ public class DorisCommitter implements
Committer<DorisCommittable> {
}
@Override
- public void close() throws Exception {
+ public void close() {
if (httpClient != null) {
- httpClient.close();
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ }
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index cda3c05..4fd9abf 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -139,6 +139,8 @@ public class DorisStreamLoad implements Serializable {
LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix,
chkID);
while (true) {
try {
+ // TODO: According to label abort txn. Currently, it can only
be aborted based on txnid,
+ // so we must first request a streamload based on the label
to get the txnid.
String label = labelGenerator.generateLabel(startChkID);
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(loadUrlStr)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 230bad5..295a0be 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -17,6 +17,7 @@
package org.apache.doris.flink.sink.writer;
+import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -27,11 +28,10 @@ import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpUtil;
-
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.util.Preconditions;
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -56,7 +57,8 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
* Doris Writer will load data to doris.
* @param <IN>
*/
-public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable,
DorisWriterState> {
+public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN,
DorisWriterState>,
+ TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, DorisCommittable> {
private static final Logger LOG =
LoggerFactory.getLogger(DorisWriter.class);
private static final List<String> DORIS_SUCCESS_STATUS = new
ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
private final long lastCheckpointId;
@@ -78,7 +80,7 @@ public class DorisWriter<IN> implements SinkWriter<IN,
DorisCommittable, DorisWr
private String currentLabel;
public DorisWriter(Sink.InitContext initContext,
- List<DorisWriterState> state,
+ Collection<DorisWriterState> state,
DorisRecordSerializer<IN> serializer,
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
@@ -100,9 +102,11 @@ public class DorisWriter<IN> implements SinkWriter<IN,
DorisCommittable, DorisWr
this.executionOptions = executionOptions;
this.intervalTime = executionOptions.checkInterval();
this.loading = false;
+
+ initializeLoad(state);
}
- public void initializeLoad(List<DorisWriterState> state) throws
IOException {
+ public void initializeLoad(Collection<DorisWriterState> state) {
this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ?
new BackendUtil(
dorisOptions.getBenodes())
: new BackendUtil(RestService.getBackendsV2(dorisOptions,
dorisReadOptions, LOG));
@@ -144,7 +148,13 @@ public class DorisWriter<IN> implements SinkWriter<IN,
DorisCommittable, DorisWr
}
@Override
- public List<DorisCommittable> prepareCommit(boolean flush) throws
IOException {
+ public void flush(boolean flush) throws IOException, InterruptedException {
+
+ }
+
+
+ @Override
+ public Collection<DorisCommittable> prepareCommit() throws IOException,
InterruptedException {
if(!loading){
//There is no data during the entire checkpoint period
return Collections.emptyList();
@@ -246,4 +256,5 @@ public class DorisWriter<IN> implements SinkWriter<IN,
DorisCommittable, DorisWr
dorisStreamLoad.close();
}
}
+
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 06f2bfb..66d0227 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -27,7 +27,6 @@ import org.apache.doris.flink.sink.writer.RowDataSerializer;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
@@ -104,7 +103,7 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
.setDorisReadOptions(readOptions)
.setDorisExecutionOptions(executionOptions)
.setSerializer(serializerBuilder.build());
- return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism);
+ return SinkV2Provider.of(dorisSinkBuilder.build(),
sinkParallelism);
}else{
DorisBatchSink.Builder<RowData> dorisBatchSinkBuilder =
DorisBatchSink.builder();
dorisBatchSinkBuilder.setDorisOptions(options)
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/MockCommitRequest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/MockCommitRequest.java
new file mode 100644
index 0000000..d879e81
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/MockCommitRequest.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.committer;
+
+import org.apache.flink.api.connector.sink2.Committer;
+
+public class MockCommitRequest <CommT> implements
Committer.CommitRequest<CommT>{
+
+ private final CommT committable;
+
+ public MockCommitRequest(CommT committable) {
+ this.committable = committable;
+ }
+
+ @Override
+ public CommT getCommittable() {
+ return committable;
+ }
+
+ @Override
+ public int getNumberOfRetries() {
+ return 0;
+ }
+
+ @Override
+ public void signalFailedWithKnownReason(Throwable throwable) {
+
+ }
+
+ @Override
+ public void signalFailedWithUnknownReason(Throwable throwable) {
+
+ }
+
+ @Override
+ public void retryLater() {
+
+ }
+
+ @Override
+ public void updateAndRetryLater(CommT commT) {
+
+ }
+
+ @Override
+ public void signalAlreadyCommitted() {
+
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
index 7cc2a88..794f806 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
@@ -25,7 +25,6 @@ import
org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpEntityMock;
import org.apache.doris.flink.sink.OptionUtils;
-
import org.apache.http.ProtocolVersion;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -34,16 +33,15 @@ import org.apache.http.message.BasicStatusLine;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-
+import org.mockito.MockedStatic;
+import org.slf4j.Logger;
import java.util.Collections;
import static org.mockito.ArgumentMatchers.any;
-import org.mockito.MockedStatic;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
-import org.slf4j.Logger;
/**
* Test for Doris Committer.
@@ -83,7 +81,8 @@ public class TestDorisCommitter {
"\"msg\": \"errCode = 2, detailMessage = transaction [2] is
already visible, not pre-committed.\"\n" +
"}";
this.entityMock.setValue(response);
- dorisCommitter.commit(Collections.singletonList(dorisCommittable));
+ final MockCommitRequest<DorisCommittable> request = new
MockCommitRequest<>(dorisCommittable);
+ dorisCommitter.commit(Collections.singletonList(request));
}
@@ -94,7 +93,8 @@ public class TestDorisCommitter {
"\"msg\": \"errCode = 2, detailMessage = transaction [25] is
already aborted. abort reason: User Abort\"\n" +
"}";
this.entityMock.setValue(response);
- dorisCommitter.commit(Collections.singletonList(dorisCommittable));
+ final MockCommitRequest<DorisCommittable> request = new
MockCommitRequest<>(dorisCommittable);
+ dorisCommitter.commit(Collections.singletonList(request));
}
@After
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index e988d6b..01e1559 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -23,7 +23,7 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpTestUtil;
import org.apache.doris.flink.sink.OptionUtils;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.Assert;
@@ -31,6 +31,7 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
@@ -67,12 +68,13 @@ public class TestDorisWriter {
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext,
Collections.emptyList(), new SimpleStringSerializer(), dorisOptions,
readOptions, executionOptions);
dorisWriter.setDorisStreamLoad(dorisStreamLoad);
- List<DorisCommittable> committableList =
dorisWriter.prepareCommit(true);
-
+ dorisWriter.write("doris,1",null);
+ Collection<DorisCommittable> committableList =
dorisWriter.prepareCommit();
Assert.assertEquals(1, committableList.size());
- Assert.assertEquals("local:8040",
committableList.get(0).getHostPort());
- Assert.assertEquals("db_test", committableList.get(0).getDb());
- Assert.assertEquals(2, committableList.get(0).getTxnID());
+ DorisCommittable dorisCommittable =
committableList.stream().findFirst().get();
+ Assert.assertEquals("local:8040", dorisCommittable.getHostPort());
+ Assert.assertEquals("test", dorisCommittable.getDb());
+ Assert.assertEquals(2, dorisCommittable.getTxnID());
Assert.assertFalse(dorisWriter.isLoading());
}
@@ -91,6 +93,6 @@ public class TestDorisWriter {
Assert.assertEquals(1, writerStates.size());
Assert.assertEquals("doris", writerStates.get(0).getLabelPrefix());
- Assert.assertTrue(dorisWriter.isLoading());
+ Assert.assertTrue(!dorisWriter.isLoading());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]