This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e964c03dca [Hotfix][Mongodb cdc] Solve startup resume token is 
negative (#5143)
e964c03dca is described below

commit e964c03dca69f3a95da72fee2fba68e5cf24485d
Author: monster <[email protected]>
AuthorDate: Tue Jul 25 10:09:11 2023 +0800

    [Hotfix][Mongodb cdc] Solve startup resume token is negative (#5143)
    
    
    ---------
    
    Co-authored-by: chenzy15 <[email protected]>
---
 .../cdc/mongodb/source/dialect/MongodbDialect.java |  7 ++++
 .../seatunnel/cdc/mongodb/utils/ResumeToken.java   | 46 +++++++++-------------
 .../src/test/java/mongodb/MongoDBContainer.java    |  1 +
 3 files changed, 26 insertions(+), 28 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
index 11ef57ffc5..25e463c17e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
@@ -34,6 +34,7 @@ import org.bson.BsonDocument;
 
 import com.mongodb.client.MongoClient;
 import io.debezium.relational.TableId;
+import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Nonnull;
 
@@ -52,6 +53,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Mongod
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getCurrentClusterTime;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getLatestResumeToken;
 
+@Slf4j
 public class MongodbDialect implements DataSourceDialect<MongodbSourceConfig> {
 
     private final Map<MongodbSourceConfig, 
CollectionDiscoveryUtils.CollectionDiscoveryInfo> cache =
@@ -137,6 +139,11 @@ public class MongodbDialect implements 
DataSourceDialect<MongodbSourceConfig> {
         ChangeStreamOffset changeStreamOffset;
         if (startupResumeToken != null) {
             changeStreamOffset = new ChangeStreamOffset(startupResumeToken);
+            log.info(
+                    "startup resume token={},change stream offset={}",
+                    startupResumeToken,
+                    changeStreamOffset);
+
         } else {
             changeStreamOffset = new 
ChangeStreamOffset(getCurrentClusterTime(mongoClient));
         }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
index 3ddd2ccbb2..5ee8962bc5 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils;
 
-import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
-
 import org.bson.BsonDocument;
 import org.bson.BsonTimestamp;
 import org.bson.BsonValue;
@@ -29,41 +27,33 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Objects;
 
-import static 
org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
-
 public class ResumeToken {
 
     private static final int K_TIMESTAMP = 130;
 
-    public static @Nonnull BsonTimestamp decodeTimestamp(BsonDocument 
resumeToken) {
-        Objects.requireNonNull(resumeToken, "Missing ResumeToken.");
-        BsonValue bsonValue = resumeToken.get("_data");
-        byte[] keyStringBytes = extractKeyStringBytes(bsonValue);
-        validateKeyType(keyStringBytes);
-
-        ByteBuffer buffer = 
ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN);
-        int t = buffer.getInt();
-        int i = buffer.getInt();
-        return new BsonTimestamp(t, i);
-    }
-
-    private static byte[] extractKeyStringBytes(@Nonnull BsonValue bsonValue) {
-        if (bsonValue.isBinary()) {
-            return bsonValue.asBinary().getData();
-        } else if (bsonValue.isString()) {
-            return hexToUint8Array(bsonValue.asString().getValue());
+    public static BsonTimestamp decodeTimestamp(BsonDocument resumeToken) {
+        BsonValue bsonValue =
+                Objects.requireNonNull(resumeToken, "Missing 
ResumeToken.").get("_data");
+        final byte[] keyStringBytes;
+        // Resume Tokens format: 
https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens
+        if (bsonValue.isBinary()) { // BinData
+            keyStringBytes = bsonValue.asBinary().getData();
+        } else if (bsonValue.isString()) { // Hex-encoded string (v0 or v1)
+            keyStringBytes = hexToUint8Array(bsonValue.asString().getValue());
         } else {
-            throw new MongodbConnectorException(
-                    ILLEGAL_ARGUMENT, "Unknown resume token format: " + 
bsonValue);
+            throw new IllegalArgumentException(
+                    "Unknown resume token format: " + resumeToken.toJson());
         }
-    }
 
-    private static void validateKeyType(byte[] keyStringBytes) {
-        int kType = keyStringBytes[0] & 0xff;
+        ByteBuffer buffer = 
ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN);
+        int kType = buffer.get() & 0xff;
         if (kType != K_TIMESTAMP) {
-            throw new MongodbConnectorException(
-                    ILLEGAL_ARGUMENT, "Unknown keyType of timestamp: " + 
kType);
+            throw new IllegalArgumentException("Unknown keyType of timestamp: 
" + kType);
         }
+
+        int t = buffer.getInt();
+        int i = buffer.getInt();
+        return new BsonTimestamp(t, i);
     }
 
     private static byte[] hexToUint8Array(@Nonnull String str) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java
index c33f6d047d..a311bccc90 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java
@@ -84,6 +84,7 @@ public class MongoDBContainer extends 
GenericContainer<MongoDBContainer> {
         withExposedPorts(MONGODB_PORT);
         withCommand(ShardingClusterRole.startupCommand(clusterRole));
         waitingFor(clusterRole.waitStrategy);
+        withEnv("TZ", "Asia/Shanghai");
     }
 
     public void executeCommand(String command) {

Reply via email to