This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new db2daaeda [core] Introduce row time flag sequence.auto-padding (#1744)
db2daaeda is described below

commit db2daaeda3371ae17f6c9c61e1057a5c6819a556
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Aug 7 17:52:18 2023 +0800

    [core] Introduce row time flag sequence.auto-padding (#1744)
---
 docs/content/concepts/primary-key-table.md         | 30 +++++++-----
 .../shortcodes/generated/core_configuration.html   |  6 +--
 .../main/java/org/apache/paimon/CoreOptions.java   | 40 ++++++++++++---
 .../table/ChangelogWithKeyFileStoreTable.java      |  9 ++--
 .../paimon/table/sink/SequenceGenerator.java       | 57 ++++++++++++++--------
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  2 +-
 .../paimon/table/sink/SequenceGeneratorTest.java   | 43 ++++++++++++++--
 7 files changed, 137 insertions(+), 50 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index ab4a9ed16..5f1e77aa9 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -353,19 +353,8 @@ changelog for the same record.
 By default, the primary key table determines the merge order according to the 
input order (the last input record will be the last to merge). However, in 
distributed computing,
 there will be some cases that lead to data disorder. At this time, you can use 
a time field as `sequence.field`, for example:
 
-{{< hint info >}}
-When the record is updated or deleted, the `sequence.field` must become larger 
and cannot remain unchanged. 
-For -U and +U, their sequence-fields must be different.
-
-If the provided `sequence.field` doesn't meet the precision, like a rough 
second or millisecond, you can set
-`sequence.auto-padding` to `second-to-micro` or `millis-to-micro` so that the 
precision of sequence number will
-be made up to microsecond by system. 
-{{< /hint >}}
-
 {{< tabs "sequence.field" >}}
-
 {{< tab "Flink" >}}
-
 ```sql
 CREATE TABLE MyTable (
     pk BIGINT PRIMARY KEY NOT ENFORCED,
@@ -376,9 +365,24 @@ CREATE TABLE MyTable (
     'sequence.field' = 'dt'
 );
 ```
-
 {{< /tab >}}
-
 {{< /tabs >}}
 
 The record with the largest `sequence.field` value will be the last to merge, 
regardless of the input order.
+
+**Sequence Auto Padding**:
+
+When the record is updated or deleted, the `sequence.field` must become larger 
and cannot remain unchanged.
+For -U and +U, their sequence-fields must be different. If you cannot meet 
this requirement, Paimon provides
+option to automatically pad the sequence field for you.
+
+1. `'sequence.auto-padding' = 'row-kind-flag'`: If you are using same value 
for -U and +U, just like "`op_ts`"
+(the time that the change was made in the database) in Mysql Binlog. It is 
recommended to use the automatic
+padding for row kind flag, which will automatically distinguish between -U 
(-D) and +U (+I).
+
+2. Insufficient precision: If the provided `sequence.field` doesn't meet the 
precision, like a rough second or
+millisecond, you can set `sequence.auto-padding` to `second-to-micro` or 
`millis-to-micro` so that the precision
+of sequence number will be made up to microsecond by system.
+
+3. Composite pattern: for example, "second-to-micro,row-kind-flag", first, add 
the micro to the second, and then
+pad the row kind flag.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 01b22cbab..fc9ca8900 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -420,9 +420,9 @@ This config option does not affect the default filesystem 
metastore.</td>
         </tr>
         <tr>
             <td><h5>sequence.auto-padding</h5></td>
-            <td style="word-wrap: break-word;">none</td>
-            <td><p>Enum</p></td>
-            <td>Specify the way of padding precision up to micro-second if the 
provided sequence field is used to indicate "time" but doesn't meet the 
precise.<br /><br />Possible values:<ul><li>"none": No padding for sequence 
field.</li><li>"second-to-micro": Pads the sequence field that indicates time 
with precision of seconds to micro-second.</li><li>"millis-to-micro": Pads the 
sequence field that indicates time with precision of milli-second to 
micro-second.</li></ul></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Specify the way of padding precision, if the provided sequence 
field is used to indicate "time" but doesn't meet the precise.<ul><li>You can 
specific:</li><li>1. "row-kind-flag": Pads a bit flag to indicate whether it is 
retract (0) or add (1) message.</li><li>2. "second-to-micro": Pads the sequence 
field that indicates time with precision of seconds to micro-second.</li><li>3. 
"millis-to-micro": Pads the sequence field that indicates time with precision 
of milli-second t [...]
         </tr>
         <tr>
             <td><h5>sequence.field</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index a56d26d7c..cea7f74a0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.options.OptionsUtils;
 import org.apache.paimon.options.description.DescribedEnum;
 import org.apache.paimon.options.description.Description;
 import org.apache.paimon.options.description.InlineElement;
@@ -37,6 +38,7 @@ import java.lang.reflect.Field;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -394,13 +396,25 @@ public class CoreOptions implements Serializable {
                             "The field that generates the sequence number for 
primary key table,"
                                     + " the sequence number determines which 
data is the most recent.");
 
-    public static final ConfigOption<SequenceAutoPadding> 
SEQUENCE_AUTO_PADDING =
+    public static final ConfigOption<String> SEQUENCE_AUTO_PADDING =
             key("sequence.auto-padding")
-                    .enumType(SequenceAutoPadding.class)
-                    .defaultValue(SequenceAutoPadding.NONE)
+                    .stringType()
+                    .noDefaultValue()
                     .withDescription(
-                            "Specify the way of padding precision up to 
micro-second"
-                                    + " if the provided sequence field is used 
to indicate \"time\" but doesn't meet the precise.");
+                            Description.builder()
+                                    .text(
+                                            "Specify the way of padding 
precision, if the provided sequence field is used to indicate \"time\" but 
doesn't meet the precise.")
+                                    .list(
+                                            text("You can specific:"),
+                                            text(
+                                                    "1. \"row-kind-flag\": 
Pads a bit flag to indicate whether it is retract (0) or add (1) message."),
+                                            text(
+                                                    "2. \"second-to-micro\": 
Pads the sequence field that indicates time with precision of seconds to 
micro-second."),
+                                            text(
+                                                    "3. \"millis-to-micro\": 
Pads the sequence field that indicates time with precision of milli-second to 
micro-second."),
+                                            text(
+                                                    "4. Composite pattern: for 
example, \"second-to-micro,row-kind-flag\"."))
+                                    .build());
 
     public static final ConfigOption<StartupMode> SCAN_MODE =
             key("scan.mode")
@@ -1113,8 +1127,12 @@ public class CoreOptions implements Serializable {
         return options.getOptional(SEQUENCE_FIELD);
     }
 
-    public SequenceAutoPadding sequenceAutoPadding() {
-        return options.get(SEQUENCE_AUTO_PADDING);
+    public List<String> sequenceAutoPadding() {
+        String padding = options.get(SEQUENCE_AUTO_PADDING);
+        if (padding == null) {
+            return Collections.emptyList();
+        }
+        return Arrays.asList(padding.split(","));
     }
 
     public WriteMode writeMode() {
@@ -1627,7 +1645,9 @@ public class CoreOptions implements Serializable {
 
     /** Specifies the way of making up time precision for sequence field. */
     public enum SequenceAutoPadding implements DescribedEnum {
-        NONE("none", "No padding for sequence field."),
+        ROW_KIND_FLAG(
+                "row-kind-flag",
+                "Pads a bit flag to indicate whether it is retract (0) or add 
(1) message."),
         SECOND_TO_MICRO(
                 "second-to-micro",
                 "Pads the sequence field that indicates time with precision of 
seconds to micro-second."),
@@ -1652,6 +1672,10 @@ public class CoreOptions implements Serializable {
         public InlineElement getDescription() {
             return text(description);
         }
+
+        public static SequenceAutoPadding fromString(String s) {
+            return OptionsUtils.convertToEnum(s, SequenceAutoPadding.class);
+        }
     }
 
     /** The mode for tag creation. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index 9e0157103..a9b7fad2e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.CoreOptions.SequenceAutoPadding;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.WriteMode;
@@ -248,8 +249,10 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                         .sequenceField()
                         .map(field -> new SequenceGenerator(field, 
schema().logicalRowType()))
                         .orElse(null);
-        final CoreOptions.SequenceAutoPadding sequenceAutoPadding =
-                store().options().sequenceAutoPadding();
+        final List<SequenceAutoPadding> sequenceAutoPadding =
+                store().options().sequenceAutoPadding().stream()
+                        .map(SequenceAutoPadding::fromString)
+                        .collect(Collectors.toList());
         final KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
                 store().newWrite(commitUser, manifestFilter),
@@ -258,7 +261,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                     long sequenceNumber =
                             sequenceGenerator == null
                                     ? KeyValue.UNKNOWN_SEQUENCE
-                                    : sequenceAutoPadding == 
CoreOptions.SequenceAutoPadding.NONE
+                                    : sequenceAutoPadding.isEmpty()
                                             ? 
sequenceGenerator.generate(record.row())
                                             : 
sequenceGenerator.generateWithPadding(
                                                     record.row(), 
sequenceAutoPadding);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
index 0fcf6c337..f39f62c05 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.table.sink;
 
-import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.SequenceAutoPadding;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.CharType;
@@ -31,6 +31,7 @@ import org.apache.paimon.types.DoubleType;
 import org.apache.paimon.types.FloatType;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.SmallIntType;
 import org.apache.paimon.types.TimestampType;
@@ -40,6 +41,7 @@ import org.apache.paimon.utils.InternalRowUtils;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /** Generate sequence number. */
@@ -86,35 +88,52 @@ public class SequenceGenerator {
         return generator.generate(row, index);
     }
 
-    public long generateWithPadding(InternalRow row, 
CoreOptions.SequenceAutoPadding autoPadding) {
-        switch (autoPadding) {
-            case SECOND_TO_MICRO:
-                long value = generate(row);
-                // timestamp returns millis
-                long second = fieldType.is(DataTypeFamily.TIMESTAMP) ? value / 
1000 : value;
-                return second * 1_000_000 + getCurrentMicroOfSeconds();
-            case MILLIS_TO_MICRO:
-                // Generated value is millis
-                long millis = generate(row);
-                return millis * 1_000 + getCurrentMicroOfMillis();
-            default:
-                throw new UnsupportedOperationException(
-                        "Unknown sequence padding mode " + autoPadding.name());
+    public long generateWithPadding(InternalRow row, List<SequenceAutoPadding> 
paddings) {
+        long sequence = generate(row);
+        for (SequenceAutoPadding padding : paddings) {
+            switch (padding) {
+                case ROW_KIND_FLAG:
+                    sequence = addRowKindFlag(sequence, row.getRowKind());
+                    break;
+                case SECOND_TO_MICRO:
+                    sequence = secondToMicro(sequence);
+                    break;
+                case MILLIS_TO_MICRO:
+                    sequence = millisToMicro(sequence);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unknown sequence padding mode " + padding);
+            }
         }
+        return sequence;
+    }
+
+    private long addRowKindFlag(long sequence, RowKind rowKind) {
+        return (sequence << 1) | (rowKind.isAdd() ? 1 : 0);
+    }
+
+    private long millisToMicro(long sequence) {
+        // Generated value is millis
+        return sequence * 1_000 + getCurrentMicroOfMillis();
+    }
+
+    private long secondToMicro(long sequence) {
+        // timestamp returns millis
+        long second = fieldType.is(DataTypeFamily.TIMESTAMP) ? sequence / 1000 
: sequence;
+        return second * 1_000_000 + getCurrentMicroOfSeconds();
     }
 
     private static long getCurrentMicroOfMillis() {
         long currentNanoTime = System.nanoTime();
         long mills = TimeUnit.MILLISECONDS.convert(currentNanoTime, 
TimeUnit.NANOSECONDS);
-        long microOfMillis = (currentNanoTime - mills * 1_000_000) / 1000;
-        return microOfMillis;
+        return (currentNanoTime - mills * 1_000_000) / 1000;
     }
 
     private static long getCurrentMicroOfSeconds() {
         long currentNanoTime = System.nanoTime();
         long seconds = TimeUnit.SECONDS.convert(currentNanoTime, 
TimeUnit.NANOSECONDS);
-        long microOfSecs = (currentNanoTime - seconds * 1_000_000_000) / 1000;
-        return microOfSecs;
+        return (currentNanoTime - seconds * 1_000_000_000) / 1000;
     }
 
     private interface Generator {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index 06673566a..ce901586c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -179,7 +179,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                             conf.set(CoreOptions.SEQUENCE_FIELD, "sec");
                             conf.set(
                                     CoreOptions.SEQUENCE_AUTO_PADDING,
-                                    
CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO);
+                                    
CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO.toString());
                         },
                         rowType);
         StreamTableWrite write = table.newWrite(commitUser);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
index 15c22bfc8..d656d1e18 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/SequenceGeneratorTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.table.sink;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericArray;
@@ -28,13 +27,19 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.LocalDateTime;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.paimon.CoreOptions.SequenceAutoPadding.MILLIS_TO_MICRO;
+import static org.apache.paimon.CoreOptions.SequenceAutoPadding.ROW_KIND_FLAG;
+import static 
org.apache.paimon.CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -213,6 +218,24 @@ public class SequenceGeneratorTest {
         assertUnsupportedDatatype("_multiset");
     }
 
+    @Test
+    public void testGenerateWithPaddingRowKind() {
+        assertThat(generateWithPaddingOnRowKind(1L, 
RowKind.INSERT)).isEqualTo(3);
+        assertThat(generateWithPaddingOnRowKind(1L, 
RowKind.UPDATE_AFTER)).isEqualTo(3);
+        assertThat(generateWithPaddingOnRowKind(1L, 
RowKind.UPDATE_BEFORE)).isEqualTo(2);
+        assertThat(generateWithPaddingOnRowKind(1L, 
RowKind.DELETE)).isEqualTo(2);
+
+        long maxMicros =
+                
Timestamp.fromLocalDateTime(LocalDateTime.parse("5000-01-01T00:00:00")).toMicros();
+        assertThat(generateWithPaddingOnRowKind(maxMicros, RowKind.INSERT))
+                .isEqualTo(191235168000000001L);
+
+        assertThat(generateWithPaddingOnMicrosAndRowKind(1L, RowKind.INSERT))
+                .isBetween(2001L, 3999L);
+        assertThat(generateWithPaddingOnMicrosAndRowKind(1L, 
RowKind.UPDATE_BEFORE))
+                .isBetween(2000L, 3998L);
+    }
+
     private SequenceGenerator getGenerator(String field) {
         return new SequenceGenerator(field, ALL_DATA_TYPE);
     }
@@ -224,7 +247,7 @@ public class SequenceGeneratorTest {
 
     private long generateWithPaddingOnSecond(String field) {
         return getGenerator(field)
-                .generateWithPadding(row, 
CoreOptions.SequenceAutoPadding.SECOND_TO_MICRO);
+                .generateWithPadding(row, 
Collections.singletonList(SECOND_TO_MICRO));
     }
 
     private long getSecondFromGeneratedWithPadding(long generated) {
@@ -233,7 +256,21 @@ public class SequenceGeneratorTest {
 
     private long generateWithPaddingOnMillis(String field) {
         return getGenerator(field)
-                .generateWithPadding(row, 
CoreOptions.SequenceAutoPadding.MILLIS_TO_MICRO);
+                .generateWithPadding(row, 
Collections.singletonList(MILLIS_TO_MICRO));
+    }
+
+    private long generateWithPaddingOnRowKind(long sequence, RowKind rowKind) {
+        return getGenerator("_bigint")
+                .generateWithPadding(
+                        GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, sequence),
+                        Collections.singletonList(ROW_KIND_FLAG));
+    }
+
+    private long generateWithPaddingOnMicrosAndRowKind(long sequence, RowKind 
rowKind) {
+        return getGenerator("_bigint")
+                .generateWithPadding(
+                        GenericRow.ofKind(rowKind, 0, 0, 0, 0, 0, 0, sequence),
+                        Arrays.asList(MILLIS_TO_MICRO, ROW_KIND_FLAG));
     }
 
     private long getMillisFromGeneratedWithPadding(long generated) {

Reply via email to