thiagotnunes commented on code in PR #25309:
URL: https://github.com/apache/beam/pull/25309#discussion_r1096266289


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -390,11 +392,30 @@ public Void insert(PartitionMetadata row) {
      * @param partitionTokens the partitions' unique identifiers
      */
     public Void updateToScheduled(List<String> partitionTokens) {
-      final List<Mutation> mutations =
-          partitionTokens.stream()
-              .map(token -> createUpdateMetadataStateMutationFrom(token, 
State.SCHEDULED))
-              .collect(Collectors.toList());
-      transaction.buffer(mutations);
+      HashMap<String, State> tokenToState = new HashMap<>();
+      Statement statement = createPartitionQueryStatement(partitionTokens);
+      try (ResultSet resultSet = transaction.executeQuery(statement)) {
+        while (resultSet.next()) {
+          if (resultSet.getString(COLUMN_PARTITION_TOKEN) == null

Review Comment:
   Why would the partition token / state be `null`? I think they are actually 
`NOT NULL` columns



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -390,11 +392,30 @@ public Void insert(PartitionMetadata row) {
      * @param partitionTokens the partitions' unique identifiers
      */
     public Void updateToScheduled(List<String> partitionTokens) {
-      final List<Mutation> mutations =
-          partitionTokens.stream()
-              .map(token -> createUpdateMetadataStateMutationFrom(token, 
State.SCHEDULED))
-              .collect(Collectors.toList());
-      transaction.buffer(mutations);
+      HashMap<String, State> tokenToState = new HashMap<>();
+      Statement statement = createPartitionQueryStatement(partitionTokens);
+      try (ResultSet resultSet = transaction.executeQuery(statement)) {
+        while (resultSet.next()) {
+          if (resultSet.getString(COLUMN_PARTITION_TOKEN) == null
+              || resultSet.getString(COLUMN_STATE) == null) {
+            continue;
+          }
+          tokenToState.put(
+              resultSet.getString(COLUMN_PARTITION_TOKEN),
+              State.valueOf(resultSet.getString(COLUMN_STATE)));
+        }
+      }
+
+      for (String partitionToken : partitionTokens) {

Review Comment:
   Instead of programatically doing this, could we do a `SELECT * FROM 
metadata_table WHERE state = CREATED AND partition_token IN (@partitionTokens)`?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -404,6 +425,18 @@ public Void updateToScheduled(List<String> 
partitionTokens) {
      * @param partitionToken the partition unique identifier
      */
     public Void updateToRunning(String partitionToken) {
+      Statement statement =
+          
createPartitionQueryStatement(Collections.singletonList(partitionToken));
+      try (ResultSet resultSet = transaction.executeQuery(statement)) {
+        if (resultSet.next()) {
+          if (resultSet.getString(COLUMN_STATE) == null

Review Comment:
   I don't think the state can be `NULL`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to