This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ac57b8709d3 Optimize change stream connector with more efficient
batching and blind writes, and add transaction/query tags (#25718)
ac57b8709d3 is described below
commit ac57b8709d3f2db597a5d9e6339545b0f2036a3a
Author: ChangyuLi28 <[email protected]>
AuthorDate: Fri Mar 17 15:13:27 2023 -0700
Optimize change stream connector with more efficient batching and blind
writes, and add transaction/query tags (#25718)
* Optimize change stream connector with more efficient batching and blind
writes, and add transaction/query tags
* Optimize change stream connector with more efficient batching and blind
writes, and add transaction/query tags
* Apply commit deadline only for metadata table writes
---------
Co-authored-by: Changyu Li <[email protected]>
---
.../MetadataSpannerConfigFactory.java | 2 +-
.../action/ChildPartitionsRecordAction.java | 3 +-
.../action/DetectNewPartitionsAction.java | 19 ++++--
.../changestreams/dao/PartitionMetadataDao.java | 68 +++++++++++++++-------
.../action/ChildPartitionsRecordActionTest.java | 4 +-
.../dao/PartitionMetadataDaoTest.java | 29 +++------
6 files changed, 75 insertions(+), 50 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
index 83965b1bfaa..56c67f3194f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
@@ -84,7 +84,7 @@ public class MetadataSpannerConfigFactory {
ValueProvider<Duration> commitDeadline = primaryConfig.getCommitDeadline();
if (commitDeadline != null) {
- config =
config.withCommitDeadline(StaticValueProvider.of(commitDeadline.get()));
+ config =
config.withCommitDeadline(StaticValueProvider.of(Duration.standardSeconds(60)));
}
ValueProvider<Duration> maxCumulativeBackoff =
primaryConfig.getMaxCumulativeBackoff();
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
index 70286b41778..7fb69d0e7ab 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
@@ -155,7 +155,8 @@ public class ChildPartitionsRecordAction {
} else {
return false;
}
- })
+ },
+ "InsertChildPartition")
.getResult();
if (insertedRow && isSplit) {
metrics.incPartitionRecordSplitCount();
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
index 934210250f5..73967d2a2a7 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
@@ -146,15 +146,22 @@ public class DetectNewPartitionsAction {
OutputReceiver<PartitionMetadata> receiver,
Timestamp minWatermark,
TreeMap<Timestamp, List<PartitionMetadata>> batches) {
+ List<PartitionMetadata> batchPartitionsDifferentCreatedAt = new
ArrayList<>();
+ int numTimestampsHandledSofar = 0;
for (Map.Entry<Timestamp, List<PartitionMetadata>> batch :
batches.entrySet()) {
+ numTimestampsHandledSofar++;
final Timestamp batchCreatedAt = batch.getKey();
- final List<PartitionMetadata> batchPartitions = batch.getValue();
-
- final Timestamp scheduledAt = updateBatchToScheduled(batchPartitions);
- if (!tracker.tryClaim(batchCreatedAt)) {
- return ProcessContinuation.stop();
+ final List<PartitionMetadata> batchPartitionsSameCreatedAt =
batch.getValue();
+ batchPartitionsDifferentCreatedAt.addAll(batchPartitionsSameCreatedAt);
+ if (batchPartitionsDifferentCreatedAt.size() >= 200
+ || numTimestampsHandledSofar == batches.size()) {
+ final Timestamp scheduledAt =
updateBatchToScheduled(batchPartitionsDifferentCreatedAt);
+ if (!tracker.tryClaim(batchCreatedAt)) {
+ return ProcessContinuation.stop();
+ }
+ outputBatch(receiver, minWatermark, batchPartitionsDifferentCreatedAt,
scheduledAt);
+ batchPartitionsDifferentCreatedAt = new ArrayList<>();
}
- outputBatch(receiver, minWatermark, batchPartitions, scheduledAt);
}
return ProcessContinuation.resume().withResumeDelay(resumeDuration);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
index 6dc0e7a580d..bde04bb1dec 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
@@ -33,6 +33,7 @@ import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
@@ -89,7 +90,8 @@ public class PartitionMetadataDao {
try (ResultSet queryResultSet =
databaseClient
.singleUseReadOnlyTransaction()
- .executeQuery(Statement.of(checkTableExistsStmt))) {
+ .executeQuery(
+ Statement.of(checkTableExistsStmt),
Options.tag("query=checkTableExists"))) {
return queryResultSet.next();
}
}
@@ -126,7 +128,8 @@ public class PartitionMetadataDao {
.to(partitionToken)
.build();
}
- try (ResultSet resultSet =
databaseClient.singleUse().executeQuery(statement)) {
+ try (ResultSet resultSet =
+ databaseClient.singleUse().executeQuery(statement,
Options.tag("query=getPartition"))) {
if (resultSet.next()) {
return resultSet.getCurrentRowAsStruct();
}
@@ -175,7 +178,10 @@ public class PartitionMetadataDao {
.to(State.FINISHED.name())
.build();
}
- try (ResultSet resultSet =
databaseClient.singleUse().executeQuery(statement)) {
+ try (ResultSet resultSet =
+ databaseClient
+ .singleUse()
+ .executeQuery(statement,
Options.tag("query=getUnfinishedMinWatermark"))) {
if (resultSet.next()) {
return resultSet.getTimestamp(COLUMN_WATERMARK);
}
@@ -226,7 +232,9 @@ public class PartitionMetadataDao {
.to(timestamp)
.build();
}
- return databaseClient.singleUse().executeQuery(statement);
+ return databaseClient
+ .singleUse()
+ .executeQuery(statement,
Options.tag("query=getAllPartitionsCreatedAfter"));
}
/**
@@ -259,7 +267,10 @@ public class PartitionMetadataDao {
.build();
}
- try (ResultSet resultSet =
databaseClient.singleUse().executeQuery(statement)) {
+ try (ResultSet resultSet =
+ databaseClient
+ .singleUse()
+ .executeQuery(statement,
Options.tag("query=countPartitionsCreatedAfter"))) {
if (resultSet.next()) {
return resultSet.getLong("count");
} else {
@@ -280,7 +291,7 @@ public class PartitionMetadataDao {
*/
public Timestamp insert(PartitionMetadata row) {
final TransactionResult<Void> transactionResult =
- runInTransaction(transaction -> transaction.insert(row));
+ runInTransaction(transaction -> transaction.insert(row),
"InsertsPartitionMetadata");
return transactionResult.getCommitTimestamp();
}
@@ -292,7 +303,8 @@ public class PartitionMetadataDao {
*/
public Timestamp updateToScheduled(List<String> partitionTokens) {
final TransactionResult<Void> transactionResult =
- runInTransaction(transaction ->
transaction.updateToScheduled(partitionTokens));
+ runInTransaction(
+ transaction -> transaction.updateToScheduled(partitionTokens),
"updateToScheduled");
return transactionResult.getCommitTimestamp();
}
@@ -304,7 +316,8 @@ public class PartitionMetadataDao {
*/
public Timestamp updateToRunning(String partitionToken) {
final TransactionResult<Void> transactionResult =
- runInTransaction(transaction ->
transaction.updateToRunning(partitionToken));
+ runInTransaction(
+ transaction -> transaction.updateToRunning(partitionToken),
"updateToRunning");
return transactionResult.getCommitTimestamp();
}
@@ -316,7 +329,8 @@ public class PartitionMetadataDao {
*/
public Timestamp updateToFinished(String partitionToken) {
final TransactionResult<Void> transactionResult =
- runInTransaction(transaction ->
transaction.updateToFinished(partitionToken));
+ runInTransaction(
+ transaction -> transaction.updateToFinished(partitionToken),
"updateToFinished");
return transactionResult.getCommitTimestamp();
}
@@ -327,7 +341,8 @@ public class PartitionMetadataDao {
* @param watermark the new partition watermark
*/
public void updateWatermark(String partitionToken, Timestamp watermark) {
- runInTransaction(transaction ->
transaction.updateWatermark(partitionToken, watermark));
+ runInTransaction(
+ transaction -> transaction.updateWatermark(partitionToken, watermark),
"updateWatermark");
}
/**
@@ -352,6 +367,20 @@ public class PartitionMetadataDao {
return new TransactionResult<>(result,
readWriteTransaction.getCommitTimestamp());
}
+ public <T> TransactionResult<T> runInTransaction(
+ Function<InTransactionContext, T> callable, String tagName) {
+ final TransactionRunner readWriteTransaction =
+ databaseClient.readWriteTransaction(Options.tag(tagName));
+ final T result =
+ readWriteTransaction.run(
+ transaction -> {
+ final InTransactionContext transactionContext =
+ new InTransactionContext(metadataTableName, transaction,
this.dialect);
+ return callable.apply(transactionContext);
+ });
+ return new TransactionResult<>(result,
readWriteTransaction.getCommitTimestamp());
+ }
+
/** Represents the execution of a read / write transaction in Cloud Spanner.
*/
public static class InTransactionContext {
private static final Logger LOG =
LoggerFactory.getLogger(InTransactionContext.class);
@@ -398,7 +427,8 @@ public class PartitionMetadataDao {
public Void updateToScheduled(List<String> partitionTokens) {
HashSet<String> tokens = new HashSet<>();
Statement statement = getPartitionsMatchingState(partitionTokens,
State.CREATED);
- try (ResultSet resultSet = transaction.executeQuery(statement)) {
+ try (ResultSet resultSet =
+ transaction.executeQuery(statement,
Options.tag("getPartitionsMatchingState=CREATED"))) {
while (resultSet.next()) {
tokens.add(resultSet.getString(COLUMN_PARTITION_TOKEN));
}
@@ -427,7 +457,9 @@ public class PartitionMetadataDao {
Statement statement =
getPartitionsMatchingState(Collections.singletonList(partitionToken),
State.SCHEDULED);
- try (ResultSet resultSet = transaction.executeQuery(statement)) {
+ try (ResultSet resultSet =
+ transaction.executeQuery(
+ statement, Options.tag("getPartitionsMatchingState=SCHEDULED")))
{
if (!resultSet.next()) {
LOG.info("[{}] Did not update to be RUNNING", partitionToken);
return null;
@@ -445,14 +477,6 @@ public class PartitionMetadataDao {
* @param partitionToken the partition unique identifier
*/
public Void updateToFinished(String partitionToken) {
- Statement statement =
-
getPartitionsMatchingState(Collections.singletonList(partitionToken),
State.RUNNING);
- try (ResultSet resultSet = transaction.executeQuery(statement)) {
- if (!resultSet.next()) {
- LOG.info("[{}] Did not update to be FINISHED", partitionToken);
- return null;
- }
- }
LOG.info("[{}] Successfully updating to be FINISHED", partitionToken);
transaction.buffer(
ImmutableList.of(createUpdateMetadataStateMutationFrom(partitionToken,
State.FINISHED)));
@@ -504,7 +528,9 @@ public class PartitionMetadataDao {
.to(partitionToken)
.build();
}
- try (ResultSet resultSet = transaction.executeQuery(statement)) {
+ try (ResultSet resultSet =
+ transaction.executeQuery(
+ statement,
Options.tag("getPartitionMetadataRowForGivenPartitionToken"))) {
if (resultSet.next()) {
return resultSet.getCurrentRowAsStruct();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
index 78b10368fa6..63efaca0c82 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
@@ -20,6 +20,7 @@ package
org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.CREATED;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -65,7 +66,8 @@ public class ChildPartitionsRecordActionTest {
tracker = mock(RestrictionTracker.class);
watermarkEstimator = mock(ManualWatermarkEstimator.class);
- when(dao.runInTransaction(any())).thenAnswer(new
TestTransactionAnswer(transaction));
+ when(dao.runInTransaction(any(), anyObject()))
+ .thenAnswer(new TestTransactionAnswer(transaction));
}
@Test
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
index c286ab84aeb..907fdfb3653 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -96,12 +97,13 @@ public class PartitionMetadataDaoTest {
@Test
public void testInsert() {
+
when(databaseClient.readWriteTransaction(anyObject())).thenReturn(readWriteTransactionRunner);
when(databaseClient.readWriteTransaction()).thenReturn(readWriteTransactionRunner);
when(readWriteTransactionRunner.run(any())).thenReturn(null);
when(readWriteTransactionRunner.getCommitTimestamp())
.thenReturn(Timestamp.ofTimeMicroseconds(1L));
Timestamp commitTimestamp = partitionMetadataDao.insert(ROW);
- verify(databaseClient, times(1)).readWriteTransaction();
+ verify(databaseClient, times(1)).readWriteTransaction(anyObject());
verify(readWriteTransactionRunner, times(1)).run(any());
verify(readWriteTransactionRunner, times(1)).getCommitTimestamp();
assertEquals(Timestamp.ofTimeMicroseconds(1L), commitTimestamp);
@@ -143,7 +145,7 @@ public class PartitionMetadataDaoTest {
ArgumentCaptor<ImmutableList<Mutation>> mutations =
ArgumentCaptor.forClass(ImmutableList.class);
ResultSet resultSet = mock(ResultSet.class);
- when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet);
when(resultSet.next()).thenReturn(false);
doNothing().when(transaction).buffer(mutations.capture());
@@ -155,7 +157,7 @@ public class PartitionMetadataDaoTest {
@Test
public void testInTransactionContextUpdateToRunning() {
ResultSet resultSet = mock(ResultSet.class);
- when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet);
when(resultSet.next()).thenReturn(true);
when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
@@ -178,7 +180,7 @@ public class PartitionMetadataDaoTest {
public void testInTransactionContextCannotUpdateToScheduled() {
System.out.println("Cannot update to scheduled");
ResultSet resultSet = mock(ResultSet.class);
- when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet);
when(resultSet.next()).thenReturn(false);
ArgumentCaptor<ImmutableList<Mutation>> mutations =
@@ -192,7 +194,7 @@ public class PartitionMetadataDaoTest {
public void testInTransactionContextUpdateToScheduled() {
System.out.println(" update to scheduled");
ResultSet resultSet = mock(ResultSet.class);
- when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet);
when(resultSet.next()).thenReturn(true).thenReturn(false);
when(resultSet.getString(any())).thenReturn(PARTITION_TOKEN);
when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
@@ -211,19 +213,6 @@ public class PartitionMetadataDaoTest {
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
}
- @Test
- public void testInTransactionContextCannotUpdateToFinished() {
- System.out.println("Cannot update to finished");
- ResultSet resultSet = mock(ResultSet.class);
- when(transaction.executeQuery(any())).thenReturn(resultSet);
- when(resultSet.next()).thenReturn(false);
-
- ArgumentCaptor<ImmutableList<Mutation>> mutations =
- ArgumentCaptor.forClass(ImmutableList.class);
- assertNull(inTransactionContext.updateToFinished(PARTITION_TOKEN));
- verify(transaction, times(0)).buffer(mutations.capture());
- }
-
@Test
public void testInTransactionContextUpdateToFinished() {
System.out.println("update to scheduled");
@@ -263,7 +252,7 @@ public class PartitionMetadataDaoTest {
@Test
public void testInTransactionContextGetPartitionWithNoPartitions() {
ResultSet resultSet = mock(ResultSet.class);
- when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet);
when(resultSet.next()).thenReturn(false);
assertNull(inTransactionContext.getPartition(PARTITION_TOKEN));
}
@@ -271,7 +260,7 @@ public class PartitionMetadataDaoTest {
@Test
public void testInTransactionContextGetPartitionWithPartitions() {
ResultSet resultSet = mock(ResultSet.class);
- when(transaction.executeQuery(any())).thenReturn(resultSet);
+ when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet);
when(resultSet.next()).thenReturn(true);
when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
assertNotNull(inTransactionContext.getPartition(PARTITION_TOKEN));