lvyanquan commented on code in PR #3254:
URL: https://github.com/apache/flink-cdc/pull/3254#discussion_r1635846252


##########
docs/content.zh/docs/connectors/maxcompute.md:
##########
@@ -0,0 +1,342 @@
+---
+title: "MaxCompute"
+weight: 7
+type: docs
+aliases:
+  - /connectors/maxcompute
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# MaxCompute Connector
+
+MaxCompute Pipeline 连接器可以用作 Pipeline 的 *Data 
Sink*,将数据写入[MaxCompute](https://www.aliyun.com/product/odps)。
+本文档介绍如何设置 MaxCompute Pipeline 连接器。
+
+## 连接器的功能
+
+* 自动建表
+* 表结构变更同步
+* 数据实时同步
+
+## 示例
+
+从 MySQL 读取数据同步到 MaxCompute 的 Pipeline 可以定义如下:
+
+```yaml
+source:
+  type: mysql
+  name: MySQL Source
+  hostname: 127.0.0.1
+  port: 3306
+  username: admin
+  password: pass
+  tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+  server-id: 5401-5404
+
+sink:
+  type: maxcompute
+  name: MaxCompute Sink
+  accessId: ak
+  accessKey: sk
+  endpoint: endpoint
+  project: flink_cdc
+  bucketSize: 8
+
+pipeline:
+  name: MySQL to MaxCompute Pipeline
+  parallelism: 2
+```
+
+## 连接器配置项
+
+<div class="highlight">
+<table class="colwidths-auto docutils">
+   <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>type</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>指定要使用的连接器, 这里需要设置成 <code>'maxcompute'</code>.</td>
+    </tr>
+    <tr>
+      <td>name</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Sink 的名称.</td>
+    </tr>
+    <tr>
+      <td>accessId</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>阿里云账号或RAM用户的AccessKey ID。您可以进入<a 
href="https://ram.console.aliyun.com/manage/ak";>
+            AccessKey管理页面</a> 获取AccessKey ID。</td>
+    </tr>
+    <tr>
+      <td>accessKey</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>AccessKey ID对应的AccessKey Secret。您可以进入<a 
href="https://ram.console.aliyun.com/manage/ak";>
+            AccessKey管理页面</a> 获取AccessKey Secret。</td>
+    </tr>
+    <tr>
+      <td>endpoint</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      
<td>MaxCompute服务的连接地址。您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见<a
 href="https://help.aliyun.com/zh/maxcompute/user-guide/endpoints";>
+           Endpoint</a>。</td>
+    </tr>
+    <tr>
+      <td>project</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>MaxCompute项目名称。您可以登录<a href="https://maxcompute.console.aliyun.com/";>
+           MaxCompute控制台</a>,在 工作区 > 项目管理 页面获取MaxCompute项目名称。</td>
+    </tr>
+    <tr>
+      <td>tunnelEndpoint</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>MaxCompute 
Tunnel服务的连接地址,通常这项配置可以根据指定的project所在的region进行自动路由。仅在使用代理等特殊网络环境下使用该配置。</td>
+    </tr>
+    <tr>
+      <td>quotaName</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>MaxCompute 数据传输使用的独享资源组名称,如不指定该配置,则使用共享资源组。详情可以参考<a 
href="https://help.aliyun.com/zh/maxcompute/user-guide/purchase-and-use-exclusive-resource-groups-for-dts";>
+           使用 Maxcompute 独享资源组</a></td>
+    </tr>
+    <tr>
+      <td>stsToken</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>当使用RAM角色颁发的短时有效的访问令牌(STS Token)进行鉴权时,需要指定该参数。</td>
+    </tr>
+    <tr>
+      <td>bucketsNum</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">16</td>
+      <td>Integer</td>
+      <td>自动创建 MaxCompute Transaction 表时使用的桶数。使用方式可以参考 <a 
href="ttps://help.aliyun.com/zh/maxcompute/user-guide/table-data-format">

Review Comment:
   Invalid link.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/sink/MaxComputeEventSink.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.sink;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.SessionManageCoordinatedOperatorFactory;
+import 
org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeExecutionOptions;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import 
org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+import java.io.IOException;
+
+/** A {@link Sink} of {@link Event} to MaxCompute. */
+public class MaxComputeEventSink implements Sink<Event>, 
WithPreWriteTopology<Event> {
+    private static final long serialVersionUID = 1L;
+    private final MaxComputeOptions options;
+    private final MaxComputeWriteOptions writeOptions;
+    private final MaxComputeExecutionOptions executionOptions;
+
+    public MaxComputeEventSink(
+            MaxComputeOptions options,
+            MaxComputeWriteOptions writeOptions,
+            MaxComputeExecutionOptions executionOptions) {
+        this.options = options;
+        this.writeOptions = writeOptions;
+        this.executionOptions = executionOptions;
+    }
+
+    @Override
+    public DataStream<Event> addPreWriteTopology(DataStream<Event> 
inputDataStream) {
+        SingleOutputStreamOperator<Event> stream =
+                inputDataStream.transform(
+                        "SessionManageOperator",
+                        new EventTypeInfo(),
+                        new SessionManageCoordinatedOperatorFactory(
+                                options, writeOptions, executionOptions));
+        stream.uid(Constant.PIPELINE_SESSION_MANAGE_OPERATOR_UID);
+
+        //        stream =
+        //                stream.transform(
+        //                                "PartitionByBucket",
+        //                                new PartitioningEventTypeInfo(),
+        //                                new PartitionOperator(
+        //                                        stream.getParallelism(), 
options.getBucketSize()))
+        //                        .partitionCustom(new EventPartitioner(), new
+        // PartitioningEventKeySelector())
+        //                        .transform(
+        //                                "PostPartition",
+        //                                new EventTypeInfo(),
+        //                                new 
PostPartitionOperator(stream.getParallelism()))
+        //                        .name("PartitionByBucket");

Review Comment:
   So PartitionOperator is actually unused? 
   I've found [your jira](https://issues.apache.org/jira/browse/FLINK-35237) 
about this and I think that it's a more versatile and scalable solution, so we 
can wait for that jira completed and base on that.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/sink/MaxComputeEventWriter.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.sink;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.SessionManageOperator;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CommitSessionRequest;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CommitSessionResponse;
+import 
org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeExecutionOptions;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import 
org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
+import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils;
+import org.apache.flink.cdc.connectors.maxcompute.utils.TypeConvertUtils;
+import org.apache.flink.cdc.connectors.maxcompute.writer.MaxComputeWriter;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+
+import com.aliyun.odps.data.ArrayRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/** a {@link SinkWriter} for {@link Event} for MaxCompute. */
+public class MaxComputeEventWriter implements SinkWriter<Event> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MaxComputeEventWriter.class);
+
+    private final Sink.InitContext context;
+    private final MaxComputeOptions options;
+    private final MaxComputeWriteOptions writeOptions;
+    private final MaxComputeExecutionOptions executionOptions;
+    private final Map<String, MaxComputeWriter> writerMap;
+    private final Map<TableId, Schema> schemaCache;
+
+    public MaxComputeEventWriter(
+            MaxComputeOptions options,
+            MaxComputeWriteOptions writeOptions,
+            MaxComputeExecutionOptions executionOptions,
+            Sink.InitContext context) {
+        this.context = context;
+        this.options = options;
+        this.writeOptions = writeOptions;
+        this.executionOptions = executionOptions;
+
+        this.writerMap = new HashMap<>();
+        this.schemaCache = new HashMap<>();
+    }
+
+    @Override
+    public void write(Event element, Context context) throws IOException {
+        LOG.info("Sink writer {} write {}.", this.context.getSubtaskId(), 
element);

Review Comment:
   It's unnecessary to create so many logs.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java:
##########
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.utils;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Binary;
+import com.aliyun.odps.data.SimpleStruct;
+import com.aliyun.odps.data.Struct;
+import com.aliyun.odps.table.utils.Preconditions;
+import com.aliyun.odps.type.StructTypeInfo;
+import com.aliyun.odps.type.TypeInfo;
+import com.aliyun.odps.type.TypeInfoFactory;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount;
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
+
+/**
+ * Data type mapping table This table shows the mapping relationship from 
Flink types to MaxCompute
+ * types and the corresponding Java type representation.
+ *
+ * <pre>
+ * | Flink Type                        | MaxCompute Type| Flink Java Type     
| MaxCompute Java Type |
+ * 
|-----------------------------------|----------------|---------------------|----------------------|
+ * | CHAR/VARCHAR/STRING               | STRING         | StringData          
| String               |
+ * | BOOLEAN                           | BOOLEAN        | Boolean             
| Boolean              |
+ * | BINARY/VARBINARY                  | BINARY         | byte[]              
| odps.data.Binary     |
+ * | DECIMAL                           | DECIMAL        | DecimalData         
| BigDecimal           |
+ * | TINYINT                           | TINYINT        | Byte                
| Byte                 |
+ * | SMALLINT                          | SMALLINT       | Short               
| Short                |
+ * | INTEGER                           | INTEGER        | Integer             
| Integer              |
+ * | BIGINT                            | BIGINT         | Long                
| Long                 |
+ * | FLOAT                             | FLOAT          | Float               
| Float                |
+ * | DOUBLE                            | DOUBLE         | Double              
| Double               |
+ * | TIME_WITHOUT_TIME_ZONE            | STRING         | Integer             
| String               |
+ * | DATE                              | DATE           | Integer             
| LocalDate            |
+ * | TIMESTAMP_WITHOUT_TIME_ZONE       | TIMESTAMP_NTZ  | TimestampData       
| LocalDateTime        |
+ * | TIMESTAMP_WITH_LOCAL_TIME_ZONE    | TIMESTAMP      | 
LocalZonedTimestampData | Instant          |
+ * | TIMESTAMP_WITH_TIME_ZONE          | TIMESTAMP      | ZonedTimestampData  
| Instant              |
+ * | ARRAY                             | ARRAY          | ArrayData           
| ArrayList            |
+ * | MAP                               | MAP            | MapData             
| HashMap              |
+ * | ROW                               | STRUCT         | RowData             
| odps.data.SimpleStruct|
+ * </pre>
+ *
+ * <p>When converting, put the Flink Type Name into the Column comment to 
facilitate conversion
+ * back.
+ */
+public class TypeConvertUtils {
+
+    public static TableSchema toMaxCompute(Schema flinkSchema) {
+        Preconditions.checkNotNull(flinkSchema, "flink Schema");
+        TableSchema tableSchema = new TableSchema();
+        Set<String> primaryKeys = new HashSet<>(flinkSchema.primaryKeys());
+        Set<String> partitionKeys = new HashSet<>(flinkSchema.partitionKeys());
+        List<org.apache.flink.cdc.common.schema.Column> columns = 
flinkSchema.getColumns();
+        for (int i = 0; i < flinkSchema.getColumnCount(); i++) {
+            org.apache.flink.cdc.common.schema.Column flinkColumn = 
columns.get(i);
+            Column odpsColumn =
+                    toMaxCompute(flinkColumn, 
primaryKeys.contains(flinkColumn.getName()));
+            if (partitionKeys.contains(flinkColumn.getName())) {
+                tableSchema.addPartitionColumn(odpsColumn);
+            } else {
+                tableSchema.addColumn(odpsColumn);
+            }
+        }
+        return tableSchema;
+    }
+
+    public static Column toMaxCompute(
+            org.apache.flink.cdc.common.schema.Column flinkColumn, boolean 
isPrimaryKey) {
+        Preconditions.checkNotNull(flinkColumn, "flink Schema Column");
+        DataType type = flinkColumn.getType();
+        Column.ColumnBuilder columnBuilder =
+                Column.newBuilder(flinkColumn.getName(), toMaxCompute(type))
+                        .withComment(type.asSummaryString());
+        if (isPrimaryKey) {
+            columnBuilder.primaryKey();
+        }
+        return columnBuilder.build();
+    }
+
+    public static TypeInfo toMaxCompute(DataType type) {
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+            case TIME_WITHOUT_TIME_ZONE:
+                return TypeInfoFactory.STRING;

Review Comment:
   `DataType` includes information of nullable/notNull, do we lost this 
information here?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory:
##########
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.cdc.connectors.maxcompute.MaxComputeDataSinkFactory

Review Comment:
   It's better to add a `log4j2-test.properties` file under resources for debug 
or test purpose like other connector.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtilsTest.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.utils;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.connectors.maxcompute.MockedMaxComputeOptions;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+
+import com.aliyun.odps.OdpsException;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.jupiter.api.Test;
+
+/** e2e test of SchemaEvolutionUtils. */
+public class SchemaEvolutionUtilsTest {

Review Comment:
   This test class does not actually take effect, can we use 
`maxcompute/maxcompute-emulator:v0.0.3` image to test it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to