This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ee35de68067 [improve] [txn] [PIP-160] Protocol changes and
configuration changes for transaction batch log (#16617)
ee35de68067 is described below
commit ee35de680676a12703207126ad0a1ee403dc46be
Author: fengyubiao <[email protected]>
AuthorDate: Sun Jul 17 21:24:07 2022 +0800
[improve] [txn] [PIP-160] Protocol changes and configuration changes for
transaction batch log (#16617)
### Modifications
I will complete proposal #15370 with these pull requests( *current pull
request is the step-2* ):
1. Write the batch transaction log handler: `TxnLogBufferedWriter`
2. Configuration changes and protocol changes.
3. `MLPendingAckStore` and `MLTransactionLogImpl` support reading of
batched logs.
4. `MLPendingAckStore` and `MLTransactionLogImpl` support the writing of
batched logs and support dynamic configuration.
5. Append admin API for transaction batch log and docs( admin and
configuration doc ).
6. Append metrics support for transaction batch log.
---
conf/broker.conf | 31 +++++++++++
conf/standalone.conf | 31 +++++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 61 ++++++++++++++++++++++
.../src/main/proto/TransactionPendingAck.proto | 5 ++
.../common/naming/ServiceConfigurationTest.java | 59 +++++++++++++++++++++
.../configurations/pulsar_broker_test.conf | 10 ++++
.../src/main/proto/PulsarTransactionMetadata.proto | 5 ++
.../version-2.3.0/reference-configuration.md | 8 +++
8 files changed, 210 insertions(+)
diff --git a/conf/broker.conf b/conf/broker.conf
index b801800c728..d2e13d117e8 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1462,6 +1462,37 @@ transactionPendingAckLogIndexMinLag=500
# The transaction buffer client's operation timeout in milliseconds.
transactionBufferClientOperationTimeoutInMills=3000
+# Provide a mechanism allowing the Transaction Log Store to aggregate multiple
records into a batched record and
+# persist into a single BK entry. This will make Pulsar transactions work more
efficiently, aka batched log.
+# see: https://github.com/apache/pulsar/issues/15370
+transactionLogBatchedWriteEnabled=false
+
+# If enabled the feature that transaction log batch, this attribute means
maximum log records count in a batch.
+transactionLogBatchedWriteMaxRecords=512
+
+# If enabled the feature that transaction log batch, this attribute means
bytes size in a batch,default 4m.
+transactionLogBatchedWriteMaxSize=4194304
+
+# If enabled the feature that transaction log batch, this attribute means
maximum wait time(in millis) for the first
+# record in a batch
+transactionLogBatchedWriteMaxDelayInMillis=1
+
+# Provide a mechanism allowing the Pending Ack Store to aggregate multiple
records into a batched record and persist
+# into a single BK entry. This will make Pulsar transactions work more
efficiently, aka batched log.
+# see: https://github.com/apache/pulsar/issues/15370
+transactionPendingAckBatchedWriteEnabled=false
+
+# If enabled the feature that transaction pending ack log batch, this
attribute means maximum log records count in a
+# batch.
+transactionPendingAckBatchedWriteMaxRecords=512
+
+# If enabled the feature that transaction pending ack log batch, this
attribute means bytes size in a batch, default:4m.
+transactionPendingAckBatchedWriteMaxSize=4194304
+
+# If enabled the feature that transaction pending ack log batch, this
attribute means maximum wait time(in millis) for
+# the first record in a batch.
+transactionPendingAckBatchedWriteMaxDelayInMillis=1
+
### --- Packages management service configuration variables (begin) --- ###
# Enable the packages management service or not
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 7796e3e41e8..f6bf8fe91c5 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1073,6 +1073,37 @@ transactionBufferSnapshotMinTimeInMillis=5000
# The transaction buffer client's operation timeout in milliseconds.
transactionBufferClientOperationTimeoutInMills=3000
+# Provide a mechanism allowing the Transaction Log Store to aggregate multiple
records into a batched record and
+# persist into a single BK entry. This will make Pulsar transactions work more
efficiently, aka batched log.
+# see: https://github.com/apache/pulsar/issues/15370
+transactionLogBatchedWriteEnabled=false
+
+# If enabled the feature that transaction log batch, this attribute means
maximum log records count in a batch.
+transactionLogBatchedWriteMaxRecords=512
+
+# If enabled the feature that transaction log batch, this attribute means
bytes size in a batch,default 4m.
+transactionLogBatchedWriteMaxSize=4194304
+
+# If enabled the feature that transaction log batch, this attribute means
maximum wait time(in millis) for the first
+# record in a batch
+transactionLogBatchedWriteMaxDelayInMillis=1
+
+# Provide a mechanism allowing the Pending Ack Store to aggregate multiple
records into a batched record and persist
+# into a single BK entry. This will make Pulsar transactions work more
efficiently, aka batched log.
+# see: https://github.com/apache/pulsar/issues/15370
+transactionPendingAckBatchedWriteEnabled=false
+
+# If enabled the feature that transaction pending ack log batch, this
attribute means maximum log records count in a
+# batch.
+transactionPendingAckBatchedWriteMaxRecords=512
+
+# If enabled the feature that transaction pending ack log batch, this
attribute means bytes size in a batch, default:4m.
+transactionPendingAckBatchedWriteMaxSize=4194304
+
+# If enabled the feature that transaction pending ack log batch, this
attribute means maximum wait time(in millis) for
+# the first record in a batch.
+transactionPendingAckBatchedWriteMaxDelayInMillis=1
+
### --- Packages management service configuration variables (begin) --- ###
# Enable the packages management service or not
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 666d3a6d72c..c90c04e8eda 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2660,6 +2660,67 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private long transactionPendingAckLogIndexMinLag = 500L;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ dynamic = true,
+ doc = "Provide a mechanism allowing the Transaction Log Store to
aggregate multiple records into a batched"
+ + " record and persist into a single BK entry. This will
make Pulsar transactions work more"
+ + " efficiently, aka batched log. see:
https://github.com/apache/pulsar/issues/15370. Default false"
+ )
+ private boolean transactionLogBatchedWriteEnabled = false;
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "If enabled the feature that transaction log batch, this
attribute means maximum log records count"
+ + " in a batch, default 512."
+ )
+ private int transactionLogBatchedWriteMaxRecords = 512;
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "If enabled the feature that transaction log batch, this
attribute means bytes size in a"
+ + " batch, default 4m."
+ )
+ private int transactionLogBatchedWriteMaxSize = 1024 * 1024 * 4;
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "If enabled the feature that transaction log batch, this
attribute means maximum wait time(in millis)"
+ + " for the first record in a batch, default 1
millisecond."
+ )
+ private int transactionLogBatchedWriteMaxDelayInMillis = 1;
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ dynamic = true,
+ doc = "Provide a mechanism allowing the transaction pending ack
Log Store to aggregate multiple records"
+ + " into a batched record and persist into a single BK
entry. This will make Pulsar transactions"
+ + " work more efficiently, aka batched log. see:
https://github.com/apache/pulsar/issues/15370."
+ + " Default false."
+ )
+ private boolean transactionPendingAckBatchedWriteEnabled = false;
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "If enabled the feature that transaction log batch, this
attribute means maximum log records count"
+ + " in a batch, default 512."
+ )
+ private int transactionPendingAckBatchedWriteMaxRecords = 512;
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "If enabled the feature that transaction pending ack log
batch, this attribute means bytes size in"
+ + " a batch, default 4m."
+ )
+ private int transactionPendingAckBatchedWriteMaxSize = 1024 * 1024 * 4;
+
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "If enabled the feature that transaction pending ack log
batch, this attribute means maximum wait"
+ + " time(in millis) for the first record in a batch,
default 1 millisecond."
+ )
+ private int transactionPendingAckBatchedWriteMaxDelayInMillis = 1;
+
/**** --- KeyStore TLS config variables. --- ****/
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
diff --git a/pulsar-broker/src/main/proto/TransactionPendingAck.proto
b/pulsar-broker/src/main/proto/TransactionPendingAck.proto
index fd9bb6a9914..cdf7dec6b25 100644
--- a/pulsar-broker/src/main/proto/TransactionPendingAck.proto
+++ b/pulsar-broker/src/main/proto/TransactionPendingAck.proto
@@ -43,3 +43,8 @@ message PendingAckMetadataEntry {
optional uint64 txnid_most_bits = 4;
repeated PendingAckMetadata pending_ack_metadata = 5;
}
+
+message BatchedPendingAckMetadataEntry{
+ // Array for buffer pending ack data.
+ repeated PendingAckMetadataEntry pending_ack_logs = 1;
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index d96c84c0914..95e4ecc577b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -256,4 +256,63 @@ public class ServiceConfigurationTest {
assertEquals(conf.getBookkeeperClientNumIoThreads(), 1);
}
}
+
+ /**
+ * Verify transaction batch log configuration load correct, cover these
cases:
+ * 1. broker.conf. This is default value. If the property is not
configured in the file, the default value
+ * is used. So this case can't verify property names is exactly
correct.
+ * 2. pulsar_broker_test.conf. In this configuration file, use a
non-default config value. Cover scenarios that
+ * case-1 does not cover.
+ * 3. read props from string input stream, cover no-file input stream.
+ */
+ @Test
+ public void testTransactionBatchConfigurations() throws Exception{
+ ServiceConfiguration configuration = null;
+ // broker.conf.
+ try (FileInputStream inputStream = new
FileInputStream("../conf/broker.conf")) {
+ configuration = PulsarConfigurationLoader.create(inputStream,
ServiceConfiguration.class);
+ assertFalse(configuration.isTransactionLogBatchedWriteEnabled());
+
assertEquals(configuration.getTransactionLogBatchedWriteMaxRecords(), 512);
+ assertEquals(configuration.getTransactionLogBatchedWriteMaxSize(),
1024 * 1024 * 4);
+
assertEquals(configuration.getTransactionLogBatchedWriteMaxDelayInMillis(), 1);
+
assertFalse(configuration.isTransactionPendingAckBatchedWriteEnabled());
+
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(),
512);
+
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 1024
* 1024 * 4);
+
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(),
1);
+ }
+ // pulsar_broker_test.conf.
+ try (InputStream inputStream =
this.getClass().getClassLoader().getResourceAsStream(fileName)) {
+ configuration = PulsarConfigurationLoader.create(inputStream,
ServiceConfiguration.class);
+ assertTrue(configuration.isTransactionLogBatchedWriteEnabled());
+
assertEquals(configuration.getTransactionLogBatchedWriteMaxRecords(), 11);
+ assertEquals(configuration.getTransactionLogBatchedWriteMaxSize(),
22);
+
assertEquals(configuration.getTransactionLogBatchedWriteMaxDelayInMillis(), 33);
+
assertTrue(configuration.isTransactionPendingAckBatchedWriteEnabled());
+
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(),
44);
+
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 55);
+
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(),
66);
+ }
+ // string input stream.
+ StringBuilder stringBuilder = new StringBuilder();
+
stringBuilder.append("transactionLogBatchedWriteEnabled=true").append(System.lineSeparator());
+
stringBuilder.append("transactionLogBatchedWriteMaxRecords=520").append(System.lineSeparator());
+
stringBuilder.append("transactionLogBatchedWriteMaxSize=1024").append(System.lineSeparator());
+
stringBuilder.append("transactionLogBatchedWriteMaxDelayInMillis=11").append(System.lineSeparator());
+
stringBuilder.append("transactionPendingAckBatchedWriteEnabled=true").append(System.lineSeparator());
+
stringBuilder.append("transactionPendingAckBatchedWriteMaxRecords=521").append(System.lineSeparator());
+
stringBuilder.append("transactionPendingAckBatchedWriteMaxSize=1025").append(System.lineSeparator());
+
stringBuilder.append("transactionPendingAckBatchedWriteMaxDelayInMillis=20").append(System.lineSeparator());
+ try(ByteArrayInputStream inputStream =
+ new
ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))){
+ configuration = PulsarConfigurationLoader.create(inputStream,
ServiceConfiguration.class);
+ assertTrue(configuration.isTransactionLogBatchedWriteEnabled());
+
assertEquals(configuration.getTransactionLogBatchedWriteMaxRecords(), 520);
+ assertEquals(configuration.getTransactionLogBatchedWriteMaxSize(),
1024);
+
assertEquals(configuration.getTransactionLogBatchedWriteMaxDelayInMillis(), 11);
+
assertTrue(configuration.isTransactionPendingAckBatchedWriteEnabled());
+
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(),
521);
+
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 1025);
+
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(),
20);
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index a21d92d297c..226b2f31a73 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -92,3 +92,13 @@
brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
+
+### --- Transaction config variables --- ###
+transactionLogBatchedWriteEnabled=true
+transactionLogBatchedWriteMaxRecords=11
+transactionLogBatchedWriteMaxSize=22
+transactionLogBatchedWriteMaxDelayInMillis=33
+transactionPendingAckBatchedWriteEnabled=true
+transactionPendingAckBatchedWriteMaxRecords=44
+transactionPendingAckBatchedWriteMaxSize=55
+transactionPendingAckBatchedWriteMaxDelayInMillis=66
diff --git
a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
index 828fc724795..6d506d48176 100644
---
a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
+++
b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
@@ -51,4 +51,9 @@ message TransactionMetadataEntry {
optional uint64 start_time = 9;
optional uint64 last_modification_time = 10;
optional uint64 max_local_txn_id = 11;
+}
+
+message BatchedTransactionMetadataEntry{
+ // Array for buffer transaction log data.
+ repeated TransactionMetadataEntry transaction_logs = 1;
}
\ No newline at end of file
diff --git
a/site2/website/versioned_docs/version-2.3.0/reference-configuration.md
b/site2/website/versioned_docs/version-2.3.0/reference-configuration.md
index a44beafd76a..2b91bd79cc8 100644
--- a/site2/website/versioned_docs/version-2.3.0/reference-configuration.md
+++ b/site2/website/versioned_docs/version-2.3.0/reference-configuration.md
@@ -215,6 +215,14 @@ Pulsar brokers are responsible for handling incoming
messages from producers, di
|s3ManagedLedgerOffloadServiceEndpoint| For Amazon S3 ledger offload,
Alternative endpoint to connect to (useful for testing) ||
|s3ManagedLedgerOffloadMaxBlockSizeInBytes| For Amazon S3 ledger offload, Max
block size in bytes. (64MB by default, 5MB minimum) |67108864|
|s3ManagedLedgerOffloadReadBufferSizeInBytes| For Amazon S3 ledger offload,
Read buffer size in bytes (1MB by default) |1048576|
+|transactionLogBatchedWriteEnabled| Provide a mechanism allowing the
Transaction Log Store to aggregate multiple records into a batched record and
persist into a single BK entry. This will make Pulsar transactions work more
efficiently, aka batched log. see:
https://github.com/apache/pulsar/issues/15370 |false|
+|transactionLogBatchedWriteMaxRecords| If enabled the feature that transaction
log batch, this attribute means maximum log records count in a batch |512|
+|transactionLogBatchedWriteMaxSize| If enabled the feature that transaction
log batch, this attribute means bytes size in a batch. |4m|
+|transactionLogBatchedWriteMaxDelayInMillis| If enabled the feature that
transaction log batch, this attribute means maximum wait time(in millis) for
the first record in a batch |1|
+|transactionPendingAckBatchedWriteEnabled| Provide a mechanism allowing the
Pending Ack Store to aggregate multiple records into a batched record and
persist into a single BK entry. This will make Pulsar transactions work more
efficiently, aka batched log. see:
https://github.com/apache/pulsar/issues/15370 |false|
+|transactionPendingAckBatchedWriteMaxRecords| If enabled the feature that
transaction pending ack log batch, this attribute means maximum log records
count in a batch. |512|
+|transactionPendingAckBatchedWriteMaxSize| If enabled the feature that
transaction pending ack log batch, this attribute means bytes size in a batch.
|4m|
+|transactionPendingAckBatchedWriteMaxDelayInMillis| If enabled the feature
that transaction pending ack log batch, this attribute means maximum wait
time(in millis) for the first record in a batch |1|