This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new db2daaeda [core] Introduce row time flag sequence.auto-padding (#1744)
db2daaeda is described below
commit db2daaeda3371ae17f6c9c61e1057a5c6819a556
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Aug 7 17:52:18 2023 +0800
[core] Introduce row time flag sequence.auto-padding (#1744)
---
docs/content/concepts/primary-key-table.md | 30 +++++++-----
.../shortcodes/generated/core_configuration.html | 6 +--
.../main/java/org/apache/paimon/CoreOptions.java | 40 ++++++++++++---
.../table/ChangelogWithKeyFileStoreTable.java | 9 ++--
.../paimon/table/sink/SequenceGenerator.java | 57 ++++++++++++++--------
.../table/ChangelogWithKeyFileStoreTableTest.java | 2 +-
.../paimon/table/sink/SequenceGeneratorTest.java | 43 ++++++++++++++--
7 files changed, 137 insertions(+), 50 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index ab4a9ed16..5f1e77aa9 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -353,19 +353,8 @@ changelog for the same record.
By default, the primary key table determines the merge order according to the
input order (the last input record will be the last to merge). However, in
distributed computing,
there will be some cases that lead to data disorder. At this time, you can use
a time field as `sequence.field`, for example:
-{{< hint info >}}
-When the record is updated or deleted, the `sequence.field` must become larger
and cannot remain unchanged.
-For -U and +U, their sequence-fields must be different.
-
-If the provided `sequence.field` doesn't meet the precision, like a rough
second or millisecond, you can set
-`sequence.auto-padding` to `second-to-micro` or `millis-to-micro` so that the
precision of sequence number will
-be made up to microsecond by system.
-{{< /hint >}}
-
{{< tabs "sequence.field" >}}
-
{{< tab "Flink" >}}
-
```sql
CREATE TABLE MyTable (
pk BIGINT PRIMARY KEY NOT ENFORCED,
@@ -376,9 +365,24 @@ CREATE TABLE MyTable (
'sequence.field' = 'dt'
);
```
-
{{< /tab >}}
-
{{< /tabs >}}
The record with the largest `sequence.field` value will be the last to merge,
regardless of the input order.
+
+**Sequence Auto Padding**:
+
+When the record is updated or deleted, the `sequence.field` must become larger
and cannot remain unchanged.
+For -U and +U, their sequence-fields must be different. If you cannot meet
this requirement, Paimon provides
+option to automatically pad the sequence field for you.
+
+1. `'sequence.auto-padding' = 'row-kind-flag'`: If you are using same value
for -U and +U, just like "`op_ts`"
+(the time that the change was made in the database) in Mysql Binlog. It is
recommended to use the automatic
+padding for row kind flag, which will automatically distinguish between -U
(-D) and +U (+I).
+
+2. Insufficient precision: If the provided `sequence.field` doesn't meet the
precision, like a rough second or
+millisecond, you can set `sequence.auto-padding` to `second-to-micro` or
`millis-to-micro` so that the precision
+of sequence number will be made up to microsecond by system.
+
+3. Composite pattern: for example, "second-to-micro,row-kind-flag", first, add
the micro to the second, and then
+pad the row kind flag.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 01b22cbab..fc9ca8900 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -420,9 +420,9 @@ This config option does not affect the default filesystem
metastore.</td>
</tr>
<tr>
<td><h5>sequence.auto-padding</h5></td>
- <td style="word-wrap: break-word;">none</td>
- <td><p>Enum</p></td>
- <td>Specify the way of padding precision up to micro-second if the
provided sequence field is used to indicate "time" but doesn't meet the
precise.<br /><br />Possible values:<ul><li>"none": No padding for sequence
field.</li><li>"second-to-micro": Pads the sequence field that indicates time
with precision of seconds to micro-second.</li><li>"millis-to-micro": Pads the
sequence field that indicates time with precision of milli-second to
micro-second.</li></ul></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify the way of padding precision, if the provided sequence
field is used to indicate "time" but doesn't meet the precise.<ul><li>You can
specific:</li><li>1. "row-kind-flag": Pads a bit flag to indicate whether it is
retract (0) or add (1) message.</li><li>2. "second-to-micro": Pads the sequence
field that indicates time with precision of seconds to micro-second.</li><li>3.
"millis-to-micro": Pads the sequence field that indicates time with precision
of milli-second t [...]
</tr>
<tr>
<td><h5>sequence.field</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index a56d26d7c..cea7f74a0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
+import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.Description;
import org.apache.paimon.options.description.InlineElement;
@@ -37,6 +38,7 @@ import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -394,13 +396,25 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for
primary key table,"
+ " the sequence number determines which
data is the most recent.");
- public static final ConfigOption<SequenceAutoPadding>
SEQUENCE_AUTO_PADDING =
+ public static final ConfigOption<String> SEQUENCE_AUTO_PADDING =
key("sequence.auto-padding")
- .enumType(SequenceAutoPadding.class)
- .defaultValue(SequenceAutoPadding.NONE)
+ .stringType()
+ .noDefaultValue()
.withDescription(
- "Specify the way of padding precision up to
micro-second"
- + " if the provided sequence field is used
to indicate \"time\" but doesn't meet the precise.");
+ Description.builder()
+ .text(
+ "Specify the way of padding
precision, if the provided sequence field is used to indicate \"time\" but
doesn't meet the precise.")
+ .list(
+ text("You can specific:"),
+ text(
+ "1. \"row-kind-flag\":
Pads a bit flag to indicate whether it is retract (0) or add (1) message."),
+ text(
+ "2. \"second-to-micro\":
Pads the sequence field that indicates time with precision of seconds to
micro-second."),
+ text(
+ "3. \"millis-to-micro\":
Pads the sequence field that indicates time with precision of milli-second to
micro-second."),
+ text(
+ "4. Composite pattern: for
example, \"second-to-micro,row-kind-flag\"."))
+ .build());
public static final ConfigOption<StartupMode> SCAN_MODE =
key("scan.mode")
@@ -1113,8 +1127,12 @@ public class CoreOptions implements Serializable {
return options.getOptional(SEQUENCE_FIELD);
}
- public SequenceAutoPadding sequenceAutoPadding() {
- return options.get(SEQUENCE_AUTO_PADDING);
+ public List<String> sequenceAutoPadding() {
+ String padding = options.get(SEQUENCE_AUTO_PADDING);
+ if (padding == null) {
+ return Collections.emptyList();
+ }
+ return Arrays.asList(padding.split(","));
}
public WriteMode writeMode() {
@@ -1627,7 +1645,9 @@ public class CoreOptions implements Serializable {
/** Specifies the way of making up time precision for sequence field. */
public enum SequenceAutoPadding implements DescribedEnum {
- NONE("none", "No padding for sequence field."),
+ ROW_KIND_FLAG(
+ "row-kind-flag",
+ "Pads a bit flag to indicate whether it is retract (0) or add
(1) message."),
SECOND_TO_MICRO(
"second-to-micro",
"Pads the sequence field that indicates time with precision of
seconds to micro-second."),
@@ -1652,6 +1672,10 @@ public class CoreOptions implements Serializable {
public InlineElement getDescription() {
return text(description);
}
+
+ public static SequenceAutoPadding fromString(String s) {
+ return OptionsUtils.convertToEnum(s, SequenceAutoPadding.class);
+ }
}
/** The mode for tag creation. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index 9e0157103..a9b7fad2e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.CoreOptions.SequenceAutoPadding;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.WriteMode;
@@ -248,8 +249,10 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
.sequenceField()
.map(field -> new SequenceGenerator(field,
schema().logicalRowType()))
.orElse(null);
- final CoreOptions.SequenceAutoPadding sequenceAutoPadding =
- store().options().sequenceAutoPadding();
+ final List<SequenceAutoPadding> sequenceAutoPadding =
+ store().options().sequenceAutoPadding().stream()
+ .map(SequenceAutoPadding::fromString)
+ .collect(Collectors.toList());
final KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
store().newWrite(commitUser, manifestFilter),
@@ -258,7 +261,7 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
long sequenceNumber =
sequenceGenerator == null
? KeyValue.UNKNOWN_SEQUENCE
- : sequenceAutoPadding ==
CoreOptions.SequenceAutoPadding.NONE
+ : sequenceAutoPadding.isEmpty()
?
sequenceGenerator.generate(record.row())
:
sequenceGenerator.generateWithPadding(
record.row(),
sequenceAutoPadding);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
index 0fcf6c337..f39f62c05 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
@@ -18,7 +18,7 @@
package org.apache.paimon.table.sink;
-import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.SequenceAutoPadding;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.CharType;
@@ -31,6 +31,7 @@ import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.FloatType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.SmallIntType;
import org.apache.paimon.types.TimestampType;
@@ -40,6 +41,7 @@ import org.apache.paimon.utils.InternalRowUtils;
import javax.annotation.Nullable;
+import java.util.List;
import java.util.concurrent.TimeUnit;
/** Generate sequence number. */
@@ -86,35 +88,52 @@ public class SequenceGenerator {
return generator.generate(row, index);
}
- public long generateWithPadding(InternalRow row,
CoreOptions.SequenceAutoPadding autoPadding) {
- switch (autoPadding) {
- case SECOND_TO_MICRO:
- long value = generate(row);
- // timestamp returns millis
- long second = fieldType.is(DataTypeFamily.TIMESTAMP) ? value /
1000 : value;
- return second * 1_000_000 + getCurrentMicroOfSeconds();
- case MILLIS_TO_MICRO:
- // Generated value is millis
- long millis = generate(row);
- return millis * 1_000 + getCurrentMicroOfMillis();
- default:
- throw new UnsupportedOperationException(
- "Unknown sequence padding mode " + autoPadding.name());
+ public long generateWithPadding(InternalRow row, List<SequenceAutoPadding>
paddings) {
+ long sequence = generate(row);
+ for (SequenceAutoPadding padding : paddings) {
+ switch (padding) {
+ case ROW_KIND_FLAG:
+ sequence = addRowKindFlag(sequence, row.getRowKind());
+ break;
+ case SECOND_TO_MICRO:
+ sequence = secondToMicro(sequence);
+ break;
+ case MILLIS_TO_MICRO:
+ sequence = millisToMicro(sequence);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown sequence padding mode " + padding);
+ }
}
+ return sequence;
+ }
+
+ private long addRowKindFlag(long sequence, RowKind rowKind) {
+ return (sequence << 1) | (rowKind.isAdd() ? 1 : 0);
+ }
+
+ private long millisToMicro(long sequence) {
+ // Generated value is millis
+ return sequence * 1_000 + getCurrentMicroOfMillis();
+ }
+
+ private long secondToMicro(long sequence) {
+ // timestamp returns millis
+ long second = fieldType.is(DataTypeFamily.TIMESTAMP) ? sequence / 1000
: sequence;
+ return second * 1_000_000 + getCurrentMicroOfSeconds();
}
private static long getCurrentMicroOfMillis() {
long currentNanoTime = System.nanoTime();
long mills = TimeUnit.MILLISECONDS.convert(currentNanoTime,
TimeUnit.NANOSECONDS);
- long microOfMillis = (currentNanoTime - mills * 1_000_000) / 1000;
- return microOfMillis;
+ return (currentNanoTime - mills * 1_000_000) / 1000;
}
private static long getCurrentMicroOfSeconds() {
long currentNanoTime = System.nanoTime();
long seconds = TimeUnit.SECONDS.convert(currentNanoTime,
TimeUnit.NANOSECONDS);
- long microOfSecs = (currentNanoTime - seconds * 1_000_000_000) / 1000;
- return microOfSecs;
+ return (currentNanoTime - seconds * 1_000_000_000) / 1000;
}
private interface Generator {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index 06673566a..ce901586c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -179,7 +179,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
conf.set(CoreOptions.SEQUENCE_FIELD, "sec");
conf.set(
CoreOptions.SEQUENCE_AUTO_PADDING,
-
CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO);
+
CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO.toString());
},
rowType);
StreamTableWrite write = table.newWrite(commitUser);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
index 15c22bfc8..d656d1e18 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
@@ -18,7 +18,6 @@
package org.apache.paimon.table.sink;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
@@ -28,13 +27,19 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
+import java.time.LocalDateTime;
+import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import static
org.apache.paimon.CoreOptions.SequenceAutoPadding.MILLIS_TO_MICRO;
+import static org.apache.paimon.CoreOptions.SequenceAutoPadding.ROW_KIND_FLAG;
+import static
org.apache.paimon.CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -213,6 +218,24 @@ public class SequenceGeneratorTest {
assertUnsupportedDatatype("_multiset");
}
+ @Test
+ public void testGenerateWithPaddingRowKind() {
+ assertThat(generateWithPaddingOnRowKind(1L,
RowKind.INSERT)).isEqualTo(3);
+ assertThat(generateWithPaddingOnRowKind(1L,
RowKind.UPDATE_AFTER)).isEqualTo(3);
+ assertThat(generateWithPaddingOnRowKind(1L,
RowKind.UPDATE_BEFORE)).isEqualTo(2);
+ assertThat(generateWithPaddingOnRowKind(1L,
RowKind.DELETE)).isEqualTo(2);
+
+ long maxMicros =
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("5000-01-01T00:00:00")).toMicros();
+ assertThat(generateWithPaddingOnRowKind(maxMicros, RowKind.INSERT))
+ .isEqualTo(191235168000000001L);
+
+ assertThat(generateWithPaddingOnMicrosAndRowKind(1L, RowKind.INSERT))
+ .isBetween(2001L, 3999L);
+ assertThat(generateWithPaddingOnMicrosAndRowKind(1L,
RowKind.UPDATE_BEFORE))
+ .isBetween(2000L, 3998L);
+ }
+
private SequenceGenerator getGenerator(String field) {
return new SequenceGenerator(field, ALL_DATA_TYPE);
}
@@ -224,7 +247,7 @@ public class SequenceGeneratorTest {
private long generateWithPaddingOnSecond(String field) {
return getGenerator(field)
- .generateWithPadding(row,
CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO);
+ .generateWithPadding(row,
Collections.singletonList(SECOND_TO_MICRO));
}
private long getSecondFromGeneratedWithPadding(long generated) {
@@ -233,7 +256,21 @@ public class SequenceGeneratorTest {
private long generateWithPaddingOnMillis(String field) {
return getGenerator(field)
- .generateWithPadding(row,
CoreOptions.SequenceAutoPadding.MILLIS_TO_MICRO);
+ .generateWithPadding(row,
Collections.singletonList(MILLIS_TO_MICRO));
+ }
+
+ private long generateWithPaddingOnRowKind(long sequence, RowKind rowKind) {
+ return getGenerator("_bigint")
+ .generateWithPadding(
+ GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, sequence),
+ Collections.singletonList(ROW_KIND_FLAG));
+ }
+
+ private long generateWithPaddingOnMicrosAndRowKind(long sequence, RowKind
rowKind) {
+ return getGenerator("_bigint")
+ .generateWithPadding(
+ GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, sequence),
+ Arrays.asList(MILLIS_TO_MICRO, ROW_KIND_FLAG));
}
private long getMillisFromGeneratedWithPadding(long generated) {