This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a3fc947d5 [core] Add immutable id for system field (#4227)
a3fc947d5 is described below

commit a3fc947d5a4916b6626a783c39104154a70bf257
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Sep 23 17:12:00 2024 +0800

    [core] Add immutable id for system field (#4227)
---
 .../java/org/apache/paimon/table/SystemFields.java | 52 ++++++++++++++++
 .../main/java/org/apache/paimon/types/RowType.java |  6 +-
 .../src/main/java/org/apache/paimon/KeyValue.java  | 69 ++++------------------
 .../apache/paimon/schema/SchemaEvolutionUtil.java  |  9 +--
 .../org/apache/paimon/schema/SchemaValidation.java |  4 +-
 .../org/apache/paimon/schema/SystemColumns.java    | 36 -----------
 .../apache/paimon/table/PrimaryKeyTableUtils.java  |  5 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java | 14 +----
 .../paimon/operation/MergeFileSplitReadTest.java   |  2 +-
 .../apache/paimon/table/SchemaEvolutionTest.java   | 21 +------
 .../paimon/flink/sink/LocalMergeOperator.java      |  4 +-
 11 files changed, 85 insertions(+), 137 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/table/SystemFields.java 
b/paimon-common/src/main/java/org/apache/paimon/table/SystemFields.java
new file mode 100644
index 000000000..856e61de6
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/table/SystemFields.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** System fields. */
+public class SystemFields {
+
+    public static final int SYSTEM_FIELD_ID_START = Integer.MAX_VALUE / 2;
+
+    public static final String KEY_FIELD_PREFIX = "_KEY_";
+    public static final int KEY_FIELD_ID_START = SYSTEM_FIELD_ID_START;
+
+    public static final DataField SEQUENCE_NUMBER =
+            new DataField(Integer.MAX_VALUE - 1, "_SEQUENCE_NUMBER", 
DataTypes.BIGINT().notNull());
+
+    public static final DataField VALUE_KIND =
+            new DataField(Integer.MAX_VALUE - 2, "_VALUE_KIND", 
DataTypes.TINYINT().notNull());
+
+    public static final DataField LEVEL =
+            new DataField(Integer.MAX_VALUE - 3, "_LEVEL", 
DataTypes.INT().notNull());
+
+    public static final Set<String> SYSTEM_FIELD_NAMES =
+            Stream.of(SEQUENCE_NUMBER.name(), VALUE_KIND.name(), LEVEL.name())
+                    .collect(Collectors.toSet());
+
+    public static boolean isSystemField(int fieldId) {
+        return fieldId >= SYSTEM_FIELD_ID_START;
+    }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java 
b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
index 0fa6a9c1b..dc7c7ae32 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
@@ -20,6 +20,7 @@ package org.apache.paimon.types;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.SystemFields;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
@@ -256,7 +257,10 @@ public final class RowType extends DataType {
     public static int currentHighestFieldId(List<DataField> fields) {
         Set<Integer> fieldIds = new HashSet<>();
         new RowType(fields).collectFieldIds(fieldIds);
-        return fieldIds.stream().max(Integer::compareTo).orElse(-1);
+        return fieldIds.stream()
+                .filter(i -> !SystemFields.isSystemField(i))
+                .max(Integer::compareTo)
+                .orElse(-1);
     }
 
     public static Builder builder() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
index 31d3749bf..f2a6c0bde 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
@@ -21,12 +21,9 @@ package org.apache.paimon;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.utils.InternalRowUtils;
 
 import java.util.ArrayList;
@@ -34,10 +31,9 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.apache.paimon.schema.SystemColumns.LEVEL;
-import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
-import static org.apache.paimon.schema.SystemColumns.VALUE_KIND;
-import static org.apache.paimon.utils.Preconditions.checkState;
+import static org.apache.paimon.table.SystemFields.LEVEL;
+import static org.apache.paimon.table.SystemFields.SEQUENCE_NUMBER;
+import static org.apache.paimon.table.SystemFields.VALUE_KIND;
 
 /**
  * A key value, including user key, sequence number, value kind and value. 
This object can be
@@ -115,70 +111,29 @@ public class KeyValue {
     }
 
     public static RowType schema(RowType keyType, RowType valueType) {
-        List<DataField> fields = new ArrayList<>(keyType.getFields());
-        fields.add(new DataField(0, SEQUENCE_NUMBER, new BigIntType(false)));
-        fields.add(new DataField(1, VALUE_KIND, new TinyIntType(false)));
-        fields.addAll(valueType.getFields());
-        return new RowType(fields);
+        return new RowType(createKeyValueFields(keyType.getFields(), 
valueType.getFields()));
     }
 
     public static RowType schemaWithLevel(RowType keyType, RowType valueType) {
-        RowType.Builder builder = RowType.builder();
-        schema(keyType, valueType)
-                .getFields()
-                .forEach(f -> builder.field(f.name(), f.type(), 
f.description()));
-        builder.field(LEVEL, DataTypes.INT().notNull());
-        return builder.build();
+        List<DataField> fields = new ArrayList<>(schema(keyType, 
valueType).getFields());
+        fields.add(LEVEL);
+        return new RowType(fields);
     }
 
     /**
-     * Create key-value fields, we need to add a const value to the id of 
value field to ensure that
-     * they are consistent when compared by field id. For example, there are 
two table with key
-     * value fields as follows
-     *
-     * <ul>
-     *   <li>Table1 key fields: 1->a, 2->b, 3->c; value fields: 0->value_count
-     *   <li>Table2 key fields: 1->c, 3->d, 4->a, 5->b; value fields: 
0->value_count
-     * </ul>
-     *
-     * <p>We will use 5 as maxKeyId, and create fields for Table1/Table2 as 
follows
-     *
-     * <ul>
-     *   <li>Table1 fields: 1->a, 2->b, 3->c, 6->seq, 7->kind, 8->value_count
-     *   <li>Table2 fields: 1->c, 3->d, 4->a, 5->b, 6->seq, 7->kind, 
8->value_count
-     * </ul>
-     *
-     * <p>Then we can compare these two table fields with the field id.
+     * Create key-value fields.
      *
      * @param keyFields the key fields
      * @param valueFields the value fields
-     * @param maxKeyId the max key id
      * @return the table fields
      */
     public static List<DataField> createKeyValueFields(
-            List<DataField> keyFields, List<DataField> valueFields, final int 
maxKeyId) {
-        checkState(maxKeyId >= 
keyFields.stream().mapToInt(DataField::id).max().orElse(0));
-
+            List<DataField> keyFields, List<DataField> valueFields) {
         List<DataField> fields = new ArrayList<>(keyFields.size() + 
valueFields.size() + 2);
         fields.addAll(keyFields);
-        fields.add(
-                new DataField(
-                        maxKeyId + 1,
-                        SEQUENCE_NUMBER,
-                        new org.apache.paimon.types.BigIntType(false)));
-        fields.add(
-                new DataField(
-                        maxKeyId + 2, VALUE_KIND, new 
org.apache.paimon.types.TinyIntType(false)));
-        for (DataField valueField : valueFields) {
-            DataField newValueField =
-                    new DataField(
-                            valueField.id() + maxKeyId + 3,
-                            valueField.name(),
-                            valueField.type(),
-                            valueField.description());
-            fields.add(newValueField);
-        }
-
+        fields.add(SEQUENCE_NUMBER);
+        fields.add(VALUE_KIND);
+        fields.addAll(valueFields);
         return fields;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
index 6b724c74f..25b8cc63f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
@@ -217,14 +217,9 @@ public class SchemaEvolutionUtil {
             int[] dataProjection,
             List<DataField> dataKeyFields,
             List<DataField> dataValueFields) {
-        int maxKeyId =
-                Math.max(
-                        
tableKeyFields.stream().mapToInt(DataField::id).max().orElse(0),
-                        
dataKeyFields.stream().mapToInt(DataField::id).max().orElse(0));
         List<DataField> tableFields =
-                KeyValue.createKeyValueFields(tableKeyFields, 
tableValueFields, maxKeyId);
-        List<DataField> dataFields =
-                KeyValue.createKeyValueFields(dataKeyFields, dataValueFields, 
maxKeyId);
+                KeyValue.createKeyValueFields(tableKeyFields, 
tableValueFields);
+        List<DataField> dataFields = 
KeyValue.createKeyValueFields(dataKeyFields, dataValueFields);
         return createIndexCastMapping(tableProjection, tableFields, 
dataProjection, dataFields);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 72ea96024..c714b90d5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -68,8 +68,8 @@ import static 
org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
 import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
 import static 
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
-import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
-import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES;
+import static org.apache.paimon.table.SystemFields.KEY_FIELD_PREFIX;
+import static org.apache.paimon.table.SystemFields.SYSTEM_FIELD_NAMES;
 import static org.apache.paimon.types.DataTypeRoot.ARRAY;
 import static org.apache.paimon.types.DataTypeRoot.MAP;
 import static org.apache.paimon.types.DataTypeRoot.MULTISET;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
deleted file mode 100644
index f6350f44a..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.schema;
-
-import java.util.Arrays;
-import java.util.List;
-
-/** System columns for key value store. */
-public class SystemColumns {
-
-    /** System field names. */
-    public static final String KEY_FIELD_PREFIX = "_KEY_";
-
-    public static final String VALUE_COUNT = "_VALUE_COUNT";
-    public static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER";
-    public static final String VALUE_KIND = "_VALUE_KIND";
-    public static final String LEVEL = "_LEVEL";
-    public static final List<String> SYSTEM_FIELD_NAMES =
-            Arrays.asList(VALUE_COUNT, SEQUENCE_NUMBER, VALUE_KIND);
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index c74ab1f1f..58cde07c3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -34,7 +34,8 @@ import org.apache.paimon.types.RowType;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
+import static org.apache.paimon.table.SystemFields.KEY_FIELD_ID_START;
+import static org.apache.paimon.table.SystemFields.KEY_FIELD_PREFIX;
 
 /** Utils for creating changelog table with primary keys. */
 public class PrimaryKeyTableUtils {
@@ -45,7 +46,7 @@ public class PrimaryKeyTableUtils {
 
     public static List<DataField> addKeyNamePrefix(List<DataField> keyFields) {
         return keyFields.stream()
-                .map(f -> f.newName(KEY_FIELD_PREFIX + f.name()))
+                .map(f -> f.newName(KEY_FIELD_PREFIX + f.name()).newId(f.id() 
+ KEY_FIELD_ID_START))
                 .collect(Collectors.toList());
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 80080516e..f2a9c44dd 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -145,7 +145,7 @@ public abstract class MergeTreeTestBase {
                 options.get(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER) + 
1);
         this.options = new CoreOptions(options);
         RowType keyType = new RowType(singletonList(new DataField(0, "k", new 
IntType())));
-        RowType valueType = new RowType(singletonList(new DataField(0, "v", 
new IntType())));
+        RowType valueType = new RowType(singletonList(new DataField(1, "v", 
new IntType())));
 
         String identifier = "avro";
         FileFormat flushingAvro = new FlushingFileFormat(identifier);
@@ -161,20 +161,12 @@ public abstract class MergeTreeTestBase {
                         new KeyValueFieldsExtractor() {
                             @Override
                             public List<DataField> keyFields(TableSchema 
schema) {
-                                return Collections.singletonList(
-                                        new DataField(
-                                                0,
-                                                "k",
-                                                new 
org.apache.paimon.types.IntType(false)));
+                                return keyType.getFields();
                             }
 
                             @Override
                             public List<DataField> valueFields(TableSchema 
schema) {
-                                return Collections.singletonList(
-                                        new DataField(
-                                                0,
-                                                "v",
-                                                new 
org.apache.paimon.types.IntType(false)));
+                                return valueType.getFields();
                             }
                         },
                         new CoreOptions(new HashMap<>()));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index 1794e8aea..6e9a77398 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -128,7 +128,7 @@ public class MergeFileSplitReadTest {
                             public List<DataField> valueFields(TableSchema 
schema) {
                                 return Collections.singletonList(
                                         new DataField(
-                                                0,
+                                                3,
                                                 "count",
                                                 new 
org.apache.paimon.types.BigIntType()));
                             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 8771377f2..2b20b25c0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -57,8 +57,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.function.Consumer;
 
-import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
-import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES;
+import static org.apache.paimon.table.SystemFields.KEY_FIELD_PREFIX;
+import static org.apache.paimon.table.SystemFields.SYSTEM_FIELD_NAMES;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -424,23 +424,6 @@ public class SchemaEvolutionTest {
 
     @Test
     public void testCreateAlterSystemField() throws Exception {
-        Schema schema1 =
-                new Schema(
-                        RowType.of(
-                                        new DataType[] {DataTypes.INT(), 
DataTypes.BIGINT()},
-                                        new String[] {"f0", "_VALUE_COUNT"})
-                                .getFields(),
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        new HashMap<>(),
-                        "");
-        assertThatThrownBy(() -> schemaManager.createTable(schema1))
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessage(
-                        String.format(
-                                "Field name[%s] in schema cannot be exist in 
%s",
-                                "_VALUE_COUNT", SYSTEM_FIELD_NAMES));
-
         Schema schema2 =
                 new Schema(
                         RowType.of(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index 18d440dbc..aba891e44 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -48,6 +48,8 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.List;
 
+import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
+
 /**
  * {@link AbstractStreamOperator} which buffer input record and apply merge 
function when the buffer
  * is full. Mainly to resolve data skew on primary keys.
@@ -105,7 +107,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
                                     // partition fields.
                                     @Override
                                     public List<DataField> 
keyFields(TableSchema schema) {
-                                        return schema.primaryKeysFields();
+                                        return 
addKeyNamePrefix(schema.primaryKeysFields());
                                     }
 
                                     @Override

Reply via email to