This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 67c9eb16 [Improve] fix doris source duplicate splitid (#414)
67c9eb16 is described below
commit 67c9eb16b6c3adcf24e95c0ad9f0b229fb40cdc2
Author: wudi <[email protected]>
AuthorDate: Wed Jul 3 14:21:52 2024 +0800
[Improve] fix doris source duplicate splitid (#414)
* fix doris source duplicate splitid
* code style
---
.../org/apache/doris/flink/source/DorisSource.java | 7 +++++--
.../source/assigners/SimpleSplitAssigner.java | 4 ++++
.../source/enumerator/DorisSourceEnumerator.java | 2 +-
.../doris/flink/source/split/DorisSourceSplit.java | 10 +++++----
.../source/split/DorisSourceSplitSerializer.java | 24 ++++++++++++++++------
.../PendingSplitsCheckpointSerializerTest.java | 3 ++-
.../flink/source/reader/DorisSourceReaderTest.java | 2 +-
.../split/DorisSourceSplitSerializerTest.java | 3 ++-
.../flink/source/split/DorisSourceSplitTest.java | 4 ++--
9 files changed, 41 insertions(+), 18 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
index 55436b07..3c71c068 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -97,9 +97,12 @@ public class DorisSource<OUT>
List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>();
List<PartitionDefinition> partitions =
RestService.findPartitions(options, readOptions, LOG);
- partitions.forEach(m -> dorisSourceSplits.add(new
DorisSourceSplit(m)));
+ for (int index = 0; index < partitions.size(); index++) {
+ PartitionDefinition partitionDef = partitions.get(index);
+ String splitId = partitionDef.getBeAddress() + "_" + index;
+ dorisSourceSplits.add(new DorisSourceSplit(splitId, partitionDef));
+ }
DorisSplitAssigner splitAssigner = new
SimpleSplitAssigner(dorisSourceSplits);
-
return new DorisSourceEnumerator(context, splitAssigner);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
index d0dcf9d7..ee96f687 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
@@ -19,6 +19,8 @@ package org.apache.doris.flink.source.assigners;
import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint;
import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -29,6 +31,7 @@ import java.util.Optional;
/** The {@code SimpleSplitAssigner} hands out splits in a random order. */
public class SimpleSplitAssigner implements DorisSplitAssigner {
+ private static final Logger LOG =
LoggerFactory.getLogger(SimpleSplitAssigner.class);
private final ArrayList<DorisSourceSplit> splits;
public SimpleSplitAssigner(Collection<DorisSourceSplit> splits) {
@@ -43,6 +46,7 @@ public class SimpleSplitAssigner implements
DorisSplitAssigner {
@Override
public void addSplits(Collection<DorisSourceSplit> splits) {
+ LOG.info("Adding splits: {}", splits);
splits.addAll(splits);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
index a034b3b0..65fcc6fa 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java
@@ -79,7 +79,7 @@ public class DorisSourceEnumerator
@Override
public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) {
- LOG.debug("Doris Source Enumerator adds splits back: {}", splits);
+ LOG.info("Doris Source Enumerator adds splits back: {}", splits);
splitAssigner.addSplits(splits);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
index b4d60a23..f80d4165 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java
@@ -27,7 +27,7 @@ import java.util.Objects;
/** A {@link SourceSplit} that represents a {@link PartitionDefinition}. */
public class DorisSourceSplit implements SourceSplit {
-
+ private String id;
private final PartitionDefinition partitionDefinition;
/**
@@ -36,13 +36,14 @@ public class DorisSourceSplit implements SourceSplit {
*/
@Nullable transient byte[] serializedFormCache;
- public DorisSourceSplit(PartitionDefinition partitionDefinition) {
+ public DorisSourceSplit(String id, PartitionDefinition
partitionDefinition) {
+ this.id = id;
this.partitionDefinition = partitionDefinition;
}
@Override
public String splitId() {
- return partitionDefinition.getBeAddress();
+ return id;
}
public PartitionDefinition getPartitionDefinition() {
@@ -52,9 +53,10 @@ public class DorisSourceSplit implements SourceSplit {
@Override
public String toString() {
return String.format(
- "DorisSourceSplit: %s.%s,be=%s,tablets=%s",
+ "DorisSourceSplit: %s.%s,id=%s,be=%s,tablets=%s",
partitionDefinition.getDatabase(),
partitionDefinition.getTable(),
+ id,
partitionDefinition.getBeAddress(),
partitionDefinition.getTabletIds());
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
index 7e9468ec..cf7d27f6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java
@@ -39,7 +39,7 @@ public class DorisSourceSplitSerializer implements
SimpleVersionedSerializer<Dor
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
- private static final int VERSION = 1;
+ private static final int VERSION = 2;
private static void writeLongArray(DataOutputView out, Long[] values)
throws IOException {
out.writeInt(values.length);
@@ -71,6 +71,7 @@ public class DorisSourceSplitSerializer implements
SimpleVersionedSerializer<Dor
}
final DataOutputSerializer out = SERIALIZER_CACHE.get();
+
PartitionDefinition partDef = split.getPartitionDefinition();
out.writeUTF(partDef.getDatabase());
out.writeUTF(partDef.getTable());
@@ -81,6 +82,8 @@ public class DorisSourceSplitSerializer implements
SimpleVersionedSerializer<Dor
out.writeInt(queryPlanBytes.length);
out.write(queryPlanBytes);
+ out.writeUTF(split.splitId());
+
final byte[] result = out.getCopyOfBuffer();
out.clear();
@@ -93,13 +96,16 @@ public class DorisSourceSplitSerializer implements
SimpleVersionedSerializer<Dor
@Override
public DorisSourceSplit deserialize(int version, byte[] serialized) throws
IOException {
- if (version == 1) {
- return deserialize(serialized);
+ switch (version) {
+ case 1:
+ case 2:
+ return deserializeSplit(version, serialized);
+ default:
+ throw new IOException("Unknown version: " + version);
}
- throw new IOException("Unknown version: " + version);
}
- private DorisSourceSplit deserialize(byte[] serialized) throws IOException
{
+ private DorisSourceSplit deserializeSplit(int version, byte[] serialized)
throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
final String database = in.readUTF();
final String table = in.readUTF();
@@ -112,8 +118,14 @@ public class DorisSourceSplitSerializer implements
SimpleVersionedSerializer<Dor
final byte[] bytes = new byte[len];
in.read(bytes);
final String queryPlan = new String(bytes, StandardCharsets.UTF_8);
+
+ // read split id
+ String splitId = "splitId";
+ if (version >= 2) {
+ splitId = in.readUTF();
+ }
PartitionDefinition partDef =
new PartitionDefinition(database, table, beAddress, tabletIds,
queryPlan);
- return new DorisSourceSplit(partDef);
+ return new DorisSourceSplit(splitId, partDef);
}
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
index fa092796..4b7d315a 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java
@@ -35,7 +35,8 @@ public class PendingSplitsCheckpointSerializerTest {
@Test
public void serializeSplit() throws Exception {
- final DorisSourceSplit split = new
DorisSourceSplit(OptionUtils.buildPartitionDef());
+ final DorisSourceSplit split =
+ new DorisSourceSplit("splitId",
OptionUtils.buildPartitionDef());
PendingSplitsCheckpoint checkpoint = new
PendingSplitsCheckpoint(Arrays.asList(split));
final PendingSplitsCheckpointSerializer splitSerializer =
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
index 1fc3165c..f044b25e 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
@@ -40,7 +40,7 @@ public class DorisSourceReaderTest {
}
private static DorisSourceSplit createTestDorisSplit() throws IOException {
- return new DorisSourceSplit(OptionUtils.buildPartitionDef());
+ return new DorisSourceSplit("splitId",
OptionUtils.buildPartitionDef());
}
@Test
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
index 0103ccb5..6fc721ce 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java
@@ -27,7 +27,8 @@ public class DorisSourceSplitSerializerTest {
@Test
public void serializeSplit() throws Exception {
- final DorisSourceSplit split = new
DorisSourceSplit(OptionUtils.buildPartitionDef());
+ final DorisSourceSplit split =
+ new DorisSourceSplit("splitId",
OptionUtils.buildPartitionDef());
DorisSourceSplit deSerialized = serializeAndDeserializeSplit(split);
assertEquals(split, deSerialized);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitTest.java
index 40db95a6..b633affc 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitTest.java
@@ -31,8 +31,8 @@ public class DorisSourceSplitTest {
new PartitionDefinition("db", "tbl", "be", new HashSet<>(),
"queryplan1");
PartitionDefinition pd2 =
new PartitionDefinition("db", "tbl", "be", new HashSet<>(),
"queryplan1");
- DorisSourceSplit split1 = new DorisSourceSplit(pd1);
- DorisSourceSplit split2 = new DorisSourceSplit(pd2);
+ DorisSourceSplit split1 = new DorisSourceSplit("be_1", pd1);
+ DorisSourceSplit split2 = new DorisSourceSplit("be_2", pd2);
Assert.assertEquals(split1, split2);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]