This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4322595df [FLINK-31558] Support updating schema type with restrictions 
in CDC sink
4322595df is described below

commit 4322595dfc1809199e3a815c7fa82ccb0750165a
Author: tsreaper <[email protected]>
AuthorDate: Wed Mar 22 20:02:26 2023 +0800

    [FLINK-31558] Support updating schema type with restrictions in CDC sink
    
    This closes #683.
---
 .../java/org/apache/paimon/utils/TypeUtils.java    |   8 +-
 .../sink/cdc/SchemaAwareStoreWriteOperator.java    |  44 +++--
 .../sink/cdc/SchemaChangeProcessFunction.java      | 100 ++++++++++--
 .../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java  |  36 ++++-
 .../cdc/SchemaAwareStoreWriteOperatorTest.java     | 179 +++++++++++++++------
 5 files changed, 289 insertions(+), 78 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index 888c7ebf3..71fb9378b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -85,7 +85,13 @@ public class TypeUtils {
             case BIGINT:
                 return Long.valueOf(s);
             case FLOAT:
-                return Float.valueOf(s);
+                double d = Double.parseDouble(s);
+                if (d == ((float) d)) {
+                    return (float) d;
+                } else {
+                    throw new NumberFormatException(
+                            s + " cannot be cast to float due to precision 
loss");
+                }
             case DOUBLE:
                 return Double.valueOf(s);
             case DATE:
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
index 4c109ca93..d529c7089 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
@@ -35,6 +35,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -62,12 +63,12 @@ public class SchemaAwareStoreWriteOperator extends 
AbstractStoreWriteOperator<Cd
 
     @Override
     protected SinkRecord processRecord(CdcRecord record) throws Exception {
-        Map<String, String> fields = record.fields();
-
-        if (!schemaMatched(fields)) {
+        Map<String, Object> convertedFields = tryConvert(record.fields());
+        if (convertedFields == null) {
             while (true) {
                 table = table.copyWithLatestSchema();
-                if (schemaMatched(fields)) {
+                convertedFields = tryConvert(record.fields());
+                if (convertedFields != null) {
                     break;
                 }
                 Thread.sleep(retrySleepMillis);
@@ -78,14 +79,11 @@ public class SchemaAwareStoreWriteOperator extends 
AbstractStoreWriteOperator<Cd
         TableSchema schema = table.schema();
         GenericRow row = new GenericRow(schema.fields().size());
         row.setRowKind(record.kind());
-        for (Map.Entry<String, String> field : fields.entrySet()) {
-            String key = field.getKey();
-            String value = field.getValue();
+        for (Map.Entry<String, Object> convertedField : 
convertedFields.entrySet()) {
+            String key = convertedField.getKey();
+            Object value = convertedField.getValue();
             int idx = schema.fieldNames().indexOf(key);
-            DataType type = schema.fields().get(idx).type();
-            // TODO TypeUtils.castFromString cannot deal with complex types 
like arrays and maps.
-            //  Change type of CdcRecord#field if needed.
-            row.setField(idx, TypeUtils.castFromString(value, type));
+            row.setField(idx, value);
         }
 
         try {
@@ -95,8 +93,26 @@ public class SchemaAwareStoreWriteOperator extends 
AbstractStoreWriteOperator<Cd
         }
     }
 
-    private boolean schemaMatched(Map<String, String> fields) {
-        TableSchema currentSchema = table.schema();
-        return currentSchema.fieldNames().containsAll(fields.keySet());
+    private Map<String, Object> tryConvert(Map<String, String> fields) {
+        Map<String, Object> converted = new HashMap<>();
+        TableSchema schema = table.schema();
+        for (Map.Entry<String, String> field : fields.entrySet()) {
+            String key = field.getKey();
+            String value = field.getValue();
+            int idx = schema.fieldNames().indexOf(key);
+            if (idx < 0) {
+                return null;
+            }
+            DataType type = schema.fields().get(idx).type();
+            // TODO TypeUtils.castFromString cannot deal with complex types 
like arrays and maps.
+            //  Change type of CdcRecord#field if needed.
+            try {
+                converted.put(key, TypeUtils.castFromString(value, type));
+            } catch (Exception e) {
+                LOG.debug("Failed to convert value " + value + " to type " + 
type, e);
+                return null;
+            }
+        }
+        return converted;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
index d3ff71e09..85a9f080e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
@@ -20,6 +20,9 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -27,6 +30,9 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * A {@link ProcessFunction} to handle {@link SchemaChange}.
  *
@@ -47,18 +53,90 @@ public class SchemaChangeProcessFunction extends 
ProcessFunction<SchemaChange, V
     public void processElement(
             SchemaChange schemaChange, Context context, Collector<Void> 
collector)
             throws Exception {
-        Preconditions.checkArgument(
-                schemaChange instanceof SchemaChange.AddColumn,
-                "Currently, only SchemaChange.AddColumn is supported.");
-        try {
-            schemaManager.commitChanges(schemaChange);
-        } catch (Exception e) {
-            // This is normal. For example when a table is split into multiple 
database tables, all
-            // these tables will be added the same column. However 
schemaManager can't handle
-            // duplicated column adds, so we just catch the exception and log 
it.
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Failed to perform schema change {}", schemaChange, 
e);
+        if (schemaChange instanceof SchemaChange.AddColumn) {
+            try {
+                schemaManager.commitChanges(schemaChange);
+            } catch (IllegalArgumentException e) {
+                // This is normal. For example when a table is split into 
multiple database tables,
+                // all these tables will be added the same column. However 
schemaManager can't
+                // handle duplicated column adds, so we just catch the 
exception and log it.
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "Failed to perform SchemaChange.AddColumn {}, "
+                                    + "possibly due to duplicated column name",
+                            schemaChange,
+                            e);
+                }
             }
+        } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
+            SchemaChange.UpdateColumnType updateColumnType =
+                    (SchemaChange.UpdateColumnType) schemaChange;
+            TableSchema schema =
+                    schemaManager
+                            .latest()
+                            .orElseThrow(
+                                    () ->
+                                            new RuntimeException(
+                                                    "Table does not exist. 
This is unexpected."));
+            int idx = 
schema.fieldNames().indexOf(updateColumnType.fieldName());
+            Preconditions.checkState(
+                    idx >= 0,
+                    "Field name "
+                            + updateColumnType.fieldName()
+                            + " does not exist in table. This is unexpected.");
+            DataType oldType = schema.fields().get(idx).type();
+            DataType newType = updateColumnType.newDataType();
+            if (checkTypeConversion(oldType, newType)) {
+                schemaManager.commitChanges(schemaChange);
+            } else {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Cannot convert field %s from type %s to %s",
+                                updateColumnType.fieldName(), oldType, 
newType));
+            }
+        } else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
+            schemaManager.commitChanges(schemaChange);
+        } else if (schemaChange instanceof 
SchemaChange.UpdateColumnNullability) {
+            schemaManager.commitChanges(schemaChange);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported schema change class "
+                            + schemaChange.getClass().getName()
+                            + ", content "
+                            + schemaChange);
+        }
+    }
+
+    private static final List<DataTypeRoot> STRING_TYPES =
+            Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
+    private static final List<DataTypeRoot> INTEGER_TYPES =
+            Arrays.asList(
+                    DataTypeRoot.TINYINT,
+                    DataTypeRoot.SMALLINT,
+                    DataTypeRoot.INTEGER,
+                    DataTypeRoot.BIGINT);
+    private static final List<DataTypeRoot> FLOATING_POINT_TYPES =
+            Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);
+
+    private boolean checkTypeConversion(DataType oldType, DataType newType) {
+        int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
+        int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
+        if (oldIdx >= 0 && newIdx >= 0) {
+            return true;
+        }
+
+        oldIdx = INTEGER_TYPES.indexOf(oldType.getTypeRoot());
+        newIdx = INTEGER_TYPES.indexOf(newType.getTypeRoot());
+        if (oldIdx >= 0 && newIdx >= 0) {
+            return oldIdx <= newIdx;
         }
+
+        oldIdx = FLOATING_POINT_TYPES.indexOf(oldType.getTypeRoot());
+        newIdx = FLOATING_POINT_TYPES.indexOf(newType.getTypeRoot());
+        if (oldIdx >= 0 && newIdx >= 0) {
+            return oldIdx <= newIdx;
+        }
+
+        return false;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
index eb78c1d07..d912119bc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -89,20 +89,37 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
 
         Map<Integer, Map<String, String>> expected = new HashMap<>();
         List<String> fieldNames = new ArrayList<>();
+        List<Boolean> isBigInt = new ArrayList<>();
         fieldNames.add("v0");
+        isBigInt.add(false);
         int suffixId = 0;
         for (int i = 0; i < numEvents; i++) {
             if (schemaChangePositions.contains(i)) {
-                suffixId++;
-                String newName = "v" + suffixId;
-                fieldNames.add(newName);
-                events[i] = new TestCdcEvent(SchemaChange.addColumn(newName, 
DataTypes.INT()));
+                if (random.nextBoolean()) {
+                    int idx = random.nextInt(fieldNames.size());
+                    isBigInt.set(idx, true);
+                    events[i] =
+                            new TestCdcEvent(
+                                    SchemaChange.updateColumnType(
+                                            fieldNames.get(idx), 
DataTypes.BIGINT()));
+                } else {
+                    suffixId++;
+                    String newName = "v" + suffixId;
+                    fieldNames.add(newName);
+                    isBigInt.add(false);
+                    events[i] = new 
TestCdcEvent(SchemaChange.addColumn(newName, DataTypes.INT()));
+                }
             } else {
                 Map<String, String> fields = new HashMap<>();
                 int key = random.nextInt(numKeys);
                 fields.put("k", String.valueOf(key));
-                for (String fieldName : fieldNames) {
-                    fields.put(fieldName, String.valueOf(random.nextInt()));
+                for (int j = 0; j < fieldNames.size(); j++) {
+                    String fieldName = fieldNames.get(j);
+                    if (isBigInt.get(j)) {
+                        fields.put(fieldName, 
String.valueOf(random.nextLong()));
+                    } else {
+                        fields.put(fieldName, 
String.valueOf(random.nextInt()));
+                    }
                 }
 
                 List<CdcRecord> records = new ArrayList<>();
@@ -174,7 +191,12 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
                 Map<String, String> fields = new HashMap<>();
                 for (int i = 0; i < schema.fieldNames().size(); i++) {
                     if (!row.isNullAt(i)) {
-                        fields.put(schema.fieldNames().get(i), 
String.valueOf(row.getInt(i)));
+                        fields.put(
+                                schema.fieldNames().get(i),
+                                String.valueOf(
+                                        
schema.fields().get(i).type().equals(DataTypes.BIGINT())
+                                                ? row.getLong(i)
+                                                : row.getInt(i)));
                     }
                 }
                 actual.put(Integer.valueOf(fields.get("k")), fields);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
index c80196ef5..0fb5068e3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
@@ -52,6 +52,7 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
@@ -65,11 +66,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link SchemaAwareStoreWriteOperator}. */
 public class SchemaAwareStoreWriteOperatorTest {
 
-    private static final RowType ROW_TYPE =
-            RowType.of(
-                    new DataType[] {DataTypes.INT(), DataTypes.BIGINT(), 
DataTypes.STRING()},
-                    new String[] {"pt", "k", "v"});
-
     @TempDir java.nio.file.Path tempDir;
 
     private Path tablePath;
@@ -91,35 +87,21 @@ public class SchemaAwareStoreWriteOperatorTest {
 
     @Test
     @Timeout(30)
-    public void testProcessRecord() throws Exception {
-        FileStoreTable table = createFileStoreTable();
+    public void testAddColumn() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"pt", "k", "v"});
+
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType, Collections.singletonList("pt"), 
Arrays.asList("pt", "k"));
         OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness =
                 createTestHarness(table);
         harness.open();
 
-        BlockingQueue<CdcRecord> toProcess = new LinkedBlockingQueue<>();
-        BlockingQueue<CdcRecord> processed = new LinkedBlockingQueue<>();
-        AtomicBoolean running = new AtomicBoolean(true);
-        Runnable r =
-                () -> {
-                    long timestamp = 0;
-                    try {
-                        while (running.get()) {
-                            if (toProcess.isEmpty()) {
-                                Thread.sleep(10);
-                                continue;
-                            }
-
-                            CdcRecord record = toProcess.poll();
-                            harness.processElement(record, ++timestamp);
-                            processed.offer(record);
-                        }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                };
-
-        Thread t = new Thread(r);
+        Runner runner = new Runner(harness);
+        Thread t = new Thread(runner);
         t.start();
 
         // check that records with compatible schema can be processed 
immediately
@@ -129,16 +111,16 @@ public class SchemaAwareStoreWriteOperatorTest {
         fields.put("k", "1");
         fields.put("v", "10");
         CdcRecord expected = new CdcRecord(RowKind.INSERT, fields);
-        toProcess.offer(expected);
-        CdcRecord actual = processed.take();
+        runner.offer(expected);
+        CdcRecord actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
         fields = new HashMap<>();
         fields.put("pt", "0");
         fields.put("k", "2");
         expected = new CdcRecord(RowKind.INSERT, fields);
-        toProcess.offer(expected);
-        actual = processed.take();
+        runner.offer(expected);
+        actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
         // check that records with new fields should be processed after schema 
is updated
@@ -149,16 +131,80 @@ public class SchemaAwareStoreWriteOperatorTest {
         fields.put("v", "30");
         fields.put("v2", "300");
         expected = new CdcRecord(RowKind.INSERT, fields);
-        toProcess.offer(expected);
-        actual = processed.poll(1, TimeUnit.SECONDS);
+        runner.offer(expected);
+        actual = runner.poll(1);
         assertThat(actual).isNull();
 
         SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
         schemaManager.commitChanges(SchemaChange.addColumn("v2", 
DataTypes.INT()));
-        actual = processed.take();
+        actual = runner.take();
+        assertThat(actual).isEqualTo(expected);
+
+        runner.stop();
+        t.join();
+        harness.close();
+    }
+
+    @Test
+    @Timeout(30)
+    public void testUpdateColumnType() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.FLOAT()},
+                        new String[] {"k", "v1", "v2"});
+
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType, Collections.emptyList(), 
Collections.singletonList("k"));
+        OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness =
+                createTestHarness(table);
+        harness.open();
+
+        Runner runner = new Runner(harness);
+        Thread t = new Thread(runner);
+        t.start();
+
+        // check that records with compatible schema can be processed 
immediately
+
+        Map<String, String> fields = new HashMap<>();
+        fields.put("k", "1");
+        fields.put("v1", "10");
+        fields.put("v2", "0.625");
+        CdcRecord expected = new CdcRecord(RowKind.INSERT, fields);
+        runner.offer(expected);
+        CdcRecord actual = runner.take();
+        assertThat(actual).isEqualTo(expected);
+
+        // check that records with new fields should be processed after schema 
is updated
+
+        fields = new HashMap<>();
+        fields.put("k", "2");
+        fields.put("v1", "12345678987654321");
+        fields.put("v2", "0.25");
+        expected = new CdcRecord(RowKind.INSERT, fields);
+        runner.offer(expected);
+        actual = runner.poll(1);
+        assertThat(actual).isNull();
+
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+        schemaManager.commitChanges(SchemaChange.updateColumnType("v1", 
DataTypes.BIGINT()));
+        actual = runner.take();
+        assertThat(actual).isEqualTo(expected);
+
+        fields = new HashMap<>();
+        fields.put("k", "3");
+        fields.put("v1", "100");
+        fields.put("v2", "1.0000000000009095");
+        expected = new CdcRecord(RowKind.INSERT, fields);
+        runner.offer(expected);
+        actual = runner.poll(1);
+        assertThat(actual).isNull();
+
+        schemaManager.commitChanges(SchemaChange.updateColumnType("v2", 
DataTypes.DOUBLE()));
+        actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
-        running.set(false);
+        runner.stop();
         t.join();
         harness.close();
     }
@@ -180,19 +226,62 @@ public class SchemaAwareStoreWriteOperatorTest {
         return harness;
     }
 
-    private FileStoreTable createFileStoreTable() throws Exception {
+    private FileStoreTable createFileStoreTable(
+            RowType rowType, List<String> partitions, List<String> 
primaryKeys) throws Exception {
         Options conf = new Options();
         conf.set(SchemaAwareStoreWriteOperator.RETRY_SLEEP_TIME, 
Duration.ofMillis(10));
 
         TableSchema tableSchema =
                 SchemaUtils.forceCommit(
                         new SchemaManager(LocalFileIO.create(), tablePath),
-                        new Schema(
-                                ROW_TYPE.getFields(),
-                                Collections.singletonList("pt"),
-                                Arrays.asList("pt", "k"),
-                                conf.toMap(),
-                                ""));
+                        new Schema(rowType.getFields(), partitions, 
primaryKeys, conf.toMap(), ""));
         return FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
     }
+
+    private static class Runner implements Runnable {
+
+        private final OneInputStreamOperatorTestHarness<CdcRecord, 
Committable> harness;
+        private final BlockingQueue<CdcRecord> toProcess = new 
LinkedBlockingQueue<>();
+        private final BlockingQueue<CdcRecord> processed = new 
LinkedBlockingQueue<>();
+        private final AtomicBoolean running = new AtomicBoolean(true);
+
+        private Runner(OneInputStreamOperatorTestHarness<CdcRecord, 
Committable> harness) {
+            this.harness = harness;
+        }
+
+        private void offer(CdcRecord record) {
+            toProcess.offer(record);
+        }
+
+        private CdcRecord take() throws Exception {
+            return processed.take();
+        }
+
+        private CdcRecord poll(long seconds) throws Exception {
+            return processed.poll(seconds, TimeUnit.SECONDS);
+        }
+
+        private void stop() {
+            running.set(false);
+        }
+
+        @Override
+        public void run() {
+            long timestamp = 0;
+            try {
+                while (running.get()) {
+                    if (toProcess.isEmpty()) {
+                        Thread.sleep(10);
+                        continue;
+                    }
+
+                    CdcRecord record = toProcess.poll();
+                    harness.processElement(record, ++timestamp);
+                    processed.offer(record);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
 }

Reply via email to