thiagotnunes commented on code in PR #25311:
URL: https://github.com/apache/beam/pull/25311#discussion_r1099323297
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -494,6 +533,42 @@ private Mutation
createInsertMetadataMutationFrom(PartitionMetadata partitionMet
.build();
}
+ private Statement getPartitionsMatchingState(List<String> partitionTokens,
State state) {
+ Statement statement;
+ if (this.dialect == Dialect.POSTGRESQL) {
+ StringBuilder sqlStringBuilder =
+ new StringBuilder("SELECT * FROM \"" + metadataTableName + "\"");
Review Comment:
nit: instead of using `+`, we should use `append` for building all parts of
the `StringBuilder`
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -233,7 +233,7 @@ public ProcessContinuation run(
LOG.debug("[{}] Finishing partition", token);
partitionMetadataDao.updateToFinished(token);
metrics.decActivePartitionReadCounter();
- LOG.info("[{}] Partition finished", token);
+ LOG.info("[{}] After attempting to finish the partition", token);
Review Comment:
Is there a scenario where we couldn't finish it?
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
}
+ @Test
+ public void testInTransactionContextCannotUpdateToRunning() {
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+ // assertEquals(0, mutations.getValue().size());
Review Comment:
Remove?
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
}
+ @Test
+ public void testInTransactionContextCannotUpdateToRunning() {
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+ // assertEquals(0, mutations.getValue().size());
+ verify(transaction, times(0)).buffer(mutations.capture());
+ }
+
+ @Test
+ public void testInTransactionContextUpdateToRunning() {
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+
when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+ assertEquals(1, mutations.getValue().size());
+ Map<String, Value> mutationValueMap =
mutations.getValue().iterator().next().asMap();
+ assertEquals(
+ PARTITION_TOKEN,
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+ assertEquals(
+ PartitionMetadata.State.RUNNING.toString(),
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+ }
+
+ @Test
+ public void testInTransactionContextCannotUpdateToScheduled() {
+ System.out.println("Cannot update to scheduled");
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ doNothing().when(transaction).buffer(mutations.capture());
+
assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+ verify(transaction, times(0)).buffer(mutations.capture());
+ }
+
+ @Test
+ public void testInTransactionContextUpdateToScheduled() {
+ System.out.println(" update to scheduled");
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true).thenReturn(false);
+ when(resultSet.getString(any())).thenReturn(PARTITION_TOKEN);
+
when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ doNothing().when(transaction).buffer(mutations.capture());
+
assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+ assertEquals(1, mutations.getValue().size());
+ Map<String, Value> mutationValueMap =
mutations.getValue().iterator().next().asMap();
+ assertEquals(
+ PARTITION_TOKEN,
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+ assertEquals(
+ PartitionMetadata.State.SCHEDULED.toString(),
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+ }
+
+ @Test
+ public void testInTransactionContextCannotUpdateToFinished() {
+ System.out.println("Cannot update to finished");
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ assertNull(inTransactionContext.updateToFinished(PARTITION_TOKEN));
+ verify(transaction, times(0)).buffer(mutations.capture());
+ }
+
@Test
public void testInTransactionContextUpdateToFinished() {
+ System.out.println("update to scheduled");
Review Comment:
Please remove the `println`
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -494,6 +533,42 @@ private Mutation
createInsertMetadataMutationFrom(PartitionMetadata partitionMet
.build();
}
+ private Statement getPartitionsMatchingState(List<String> partitionTokens,
State state) {
+ Statement statement;
+ if (this.dialect == Dialect.POSTGRESQL) {
+ StringBuilder sqlStringBuilder =
+ new StringBuilder("SELECT * FROM \"" + metadataTableName + "\"");
+ sqlStringBuilder.append(" WHERE \"");
+ sqlStringBuilder.append(COLUMN_STATE + "\" = " + "'" +
state.toString() + "'");
+ if (!partitionTokens.isEmpty()) {
+ sqlStringBuilder.append(" AND \"");
+ sqlStringBuilder.append(COLUMN_PARTITION_TOKEN);
+ sqlStringBuilder.append("\"");
+ sqlStringBuilder.append(" = ANY (Array[");
+ sqlStringBuilder.append(
+ partitionTokens.stream().map(s -> "'" + s +
"'").collect(Collectors.joining(",")));
Review Comment:
Just double checking that we can't do this with a parameterized queries in
PG. It would save us a little time in the backend.
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
}
+ @Test
+ public void testInTransactionContextCannotUpdateToRunning() {
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+ // assertEquals(0, mutations.getValue().size());
+ verify(transaction, times(0)).buffer(mutations.capture());
+ }
+
+ @Test
+ public void testInTransactionContextUpdateToRunning() {
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+
when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+ assertEquals(1, mutations.getValue().size());
+ Map<String, Value> mutationValueMap =
mutations.getValue().iterator().next().asMap();
+ assertEquals(
+ PARTITION_TOKEN,
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+ assertEquals(
+ PartitionMetadata.State.RUNNING.toString(),
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+ }
+
+ @Test
+ public void testInTransactionContextCannotUpdateToScheduled() {
+ System.out.println("Cannot update to scheduled");
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ doNothing().when(transaction).buffer(mutations.capture());
+
assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+ verify(transaction, times(0)).buffer(mutations.capture());
+ }
+
+ @Test
+ public void testInTransactionContextUpdateToScheduled() {
+ System.out.println(" update to scheduled");
Review Comment:
Please remove the `println`
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
}
+ @Test
+ public void testInTransactionContextCannotUpdateToRunning() {
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+ // assertEquals(0, mutations.getValue().size());
+ verify(transaction, times(0)).buffer(mutations.capture());
+ }
+
+ @Test
+ public void testInTransactionContextUpdateToRunning() {
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+
when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+ assertEquals(1, mutations.getValue().size());
+ Map<String, Value> mutationValueMap =
mutations.getValue().iterator().next().asMap();
+ assertEquals(
+ PARTITION_TOKEN,
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+ assertEquals(
+ PartitionMetadata.State.RUNNING.toString(),
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+ }
+
+ @Test
+ public void testInTransactionContextCannotUpdateToScheduled() {
+ System.out.println("Cannot update to scheduled");
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ doNothing().when(transaction).buffer(mutations.capture());
+
assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+ verify(transaction, times(0)).buffer(mutations.capture());
+ }
+
+ @Test
+ public void testInTransactionContextUpdateToScheduled() {
+ System.out.println(" update to scheduled");
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true).thenReturn(false);
+ when(resultSet.getString(any())).thenReturn(PARTITION_TOKEN);
+
when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ doNothing().when(transaction).buffer(mutations.capture());
+
assertNull(inTransactionContext.updateToScheduled(Collections.singletonList(PARTITION_TOKEN)));
+ assertEquals(1, mutations.getValue().size());
+ Map<String, Value> mutationValueMap =
mutations.getValue().iterator().next().asMap();
+ assertEquals(
+ PARTITION_TOKEN,
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+ assertEquals(
+ PartitionMetadata.State.SCHEDULED.toString(),
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+ }
+
+ @Test
+ public void testInTransactionContextCannotUpdateToFinished() {
+ System.out.println("Cannot update to finished");
Review Comment:
Please remove the `println`
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java:
##########
@@ -149,8 +138,101 @@ public void testInTransactionContextInsert() {
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_WATERMARK).getTimestamp());
}
+ @Test
+ public void testInTransactionContextCannotUpdateToRunning() {
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(false);
+
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+ // assertEquals(0, mutations.getValue().size());
+ verify(transaction, times(0)).buffer(mutations.capture());
+ }
+
+ @Test
+ public void testInTransactionContextUpdateToRunning() {
+ ResultSet resultSet = mock(ResultSet.class);
+ when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
+
when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
+
+ ArgumentCaptor<ImmutableList<Mutation>> mutations =
+ ArgumentCaptor.forClass(ImmutableList.class);
+ doNothing().when(transaction).buffer(mutations.capture());
+ assertNull(inTransactionContext.updateToRunning(PARTITION_TOKEN));
+ assertEquals(1, mutations.getValue().size());
+ Map<String, Value> mutationValueMap =
mutations.getValue().iterator().next().asMap();
+ assertEquals(
+ PARTITION_TOKEN,
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN).getString());
+ assertEquals(
+ PartitionMetadata.State.RUNNING.toString(),
+
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
+ }
+
+ @Test
+ public void testInTransactionContextCannotUpdateToScheduled() {
+ System.out.println("Cannot update to scheduled");
Review Comment:
Please remove the `println`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]