This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e964c03dca [Hotfix][Mongodb cdc] Solve startup resume token is
negative (#5143)
e964c03dca is described below
commit e964c03dca69f3a95da72fee2fba68e5cf24485d
Author: monster <[email protected]>
AuthorDate: Tue Jul 25 10:09:11 2023 +0800
[Hotfix][Mongodb cdc] Solve startup resume token is negative (#5143)
---------
Co-authored-by: chenzy15 <[email protected]>
---
.../cdc/mongodb/source/dialect/MongodbDialect.java | 7 ++++
.../seatunnel/cdc/mongodb/utils/ResumeToken.java | 46 +++++++++-------------
.../src/test/java/mongodb/MongoDBContainer.java | 1 +
3 files changed, 26 insertions(+), 28 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
index 11ef57ffc5..25e463c17e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
@@ -34,6 +34,7 @@ import org.bson.BsonDocument;
import com.mongodb.client.MongoClient;
import io.debezium.relational.TableId;
+import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nonnull;
@@ -52,6 +53,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Mongod
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getCurrentClusterTime;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getLatestResumeToken;
+@Slf4j
public class MongodbDialect implements DataSourceDialect<MongodbSourceConfig> {
private final Map<MongodbSourceConfig,
CollectionDiscoveryUtils.CollectionDiscoveryInfo> cache =
@@ -137,6 +139,11 @@ public class MongodbDialect implements
DataSourceDialect<MongodbSourceConfig> {
ChangeStreamOffset changeStreamOffset;
if (startupResumeToken != null) {
changeStreamOffset = new ChangeStreamOffset(startupResumeToken);
+ log.info(
+ "startup resume token={},change stream offset={}",
+ startupResumeToken,
+ changeStreamOffset);
+
} else {
changeStreamOffset = new
ChangeStreamOffset(getCurrentClusterTime(mongoClient));
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
index 3ddd2ccbb2..5ee8962bc5 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils;
-import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
-
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
@@ -29,41 +27,33 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Objects;
-import static
org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
-
public class ResumeToken {
private static final int K_TIMESTAMP = 130;
- public static @Nonnull BsonTimestamp decodeTimestamp(BsonDocument
resumeToken) {
- Objects.requireNonNull(resumeToken, "Missing ResumeToken.");
- BsonValue bsonValue = resumeToken.get("_data");
- byte[] keyStringBytes = extractKeyStringBytes(bsonValue);
- validateKeyType(keyStringBytes);
-
- ByteBuffer buffer =
ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN);
- int t = buffer.getInt();
- int i = buffer.getInt();
- return new BsonTimestamp(t, i);
- }
-
- private static byte[] extractKeyStringBytes(@Nonnull BsonValue bsonValue) {
- if (bsonValue.isBinary()) {
- return bsonValue.asBinary().getData();
- } else if (bsonValue.isString()) {
- return hexToUint8Array(bsonValue.asString().getValue());
+ public static BsonTimestamp decodeTimestamp(BsonDocument resumeToken) {
+ BsonValue bsonValue =
+ Objects.requireNonNull(resumeToken, "Missing
ResumeToken.").get("_data");
+ final byte[] keyStringBytes;
+ // Resume Tokens format:
https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens
+ if (bsonValue.isBinary()) { // BinData
+ keyStringBytes = bsonValue.asBinary().getData();
+ } else if (bsonValue.isString()) { // Hex-encoded string (v0 or v1)
+ keyStringBytes = hexToUint8Array(bsonValue.asString().getValue());
} else {
- throw new MongodbConnectorException(
- ILLEGAL_ARGUMENT, "Unknown resume token format: " +
bsonValue);
+ throw new IllegalArgumentException(
+ "Unknown resume token format: " + resumeToken.toJson());
}
- }
- private static void validateKeyType(byte[] keyStringBytes) {
- int kType = keyStringBytes[0] & 0xff;
+ ByteBuffer buffer =
ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN);
+ int kType = buffer.get() & 0xff;
if (kType != K_TIMESTAMP) {
- throw new MongodbConnectorException(
- ILLEGAL_ARGUMENT, "Unknown keyType of timestamp: " +
kType);
+ throw new IllegalArgumentException("Unknown keyType of timestamp:
" + kType);
}
+
+ int t = buffer.getInt();
+ int i = buffer.getInt();
+ return new BsonTimestamp(t, i);
}
private static byte[] hexToUint8Array(@Nonnull String str) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java
index c33f6d047d..a311bccc90 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java
@@ -84,6 +84,7 @@ public class MongoDBContainer extends
GenericContainer<MongoDBContainer> {
withExposedPorts(MONGODB_PORT);
withCommand(ShardingClusterRole.startupCommand(clusterRole));
waitingFor(clusterRole.waitStrategy);
+ withEnv("TZ", "Asia/Shanghai");
}
public void executeCommand(String command) {