lvyanquan commented on code in PR #3254:
URL: https://github.com/apache/flink-cdc/pull/3254#discussion_r1829218514


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.utils;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.utils.StringUtils;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.util.CollectionUtil;
+
+import com.aliyun.odps.Instance;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.Tables;
+import com.aliyun.odps.task.SQLTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Schema evolution utils for maxcompute. */
+public class SchemaEvolutionUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaEvolutionUtils.class);
+    private static final Map<String, String> unsupportSchemahints = new 
HashMap<>();
+    private static final Map<String, String> supportSchemaHints = new 
HashMap<>();
+
+    static {
+        unsupportSchemahints.put("odps.sql.type.system.odps2", "true");
+        unsupportSchemahints.put("odps.sql.decimal.odps2", "true");
+        unsupportSchemahints.put("odps.sql.allow.schema.evolution", "true");
+
+        supportSchemaHints.put("odps.sql.type.system.odps2", "true");
+        supportSchemaHints.put("odps.sql.decimal.odps2", "true");
+        supportSchemaHints.put("odps.namespace.schema", "true");
+        supportSchemaHints.put("odps.sql.allow.namespace.schema", "true");
+        supportSchemaHints.put("odps.sql.allow.schema.evolution", "true");
+    }
+
+    private SchemaEvolutionUtils() {}
+
+    /**
+     * equals to run a sql like: create table table_name (col_name1 type1 
comment [, col_name2 type2
+     * ...]);.
+     */
+    public static void createTable(MaxComputeOptions options, TableId tableId, 
Schema schema)
+            throws OdpsException {
+        Odps odps = MaxComputeUtils.getOdps(options);
+        TableSchema tableSchema = TypeConvertUtils.toMaxCompute(schema);
+        if (options.isSupportSchema()
+                && 
!StringUtils.isNullOrWhitespaceOnly(tableId.getNamespace())) {
+            LOG.info("create schema {}", tableId.getNamespace());
+            odps.schemas()
+                    .create(
+                            odps.getDefaultProject(),
+                            tableId.getNamespace(),
+                            "generate by Flink CDC",
+                            true);
+        }
+        Tables.TableCreator tableCreator =
+                odps.tables()
+                        .newTableCreator(
+                                odps.getDefaultProject(), 
tableId.getTableName(), tableSchema)
+                        .withHints(unsupportSchemahints)
+                        .ifNotExists()
+                        .debug();
+        if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
+            tableCreator
+                    .transactionTable()
+                    .withBucketNum(options.getBucketSize())
+                    .withPrimaryKeys(schema.primaryKeys());
+        }
+        if (options.isSupportSchema()) {
+            if (StringUtils.isNullOrWhitespaceOnly(tableId.getNamespace())) {
+                
tableCreator.withSchemaName("default").withHints(supportSchemaHints);
+            } else {
+                
tableCreator.withSchemaName(tableId.getNamespace()).withHints(supportSchemaHints);
+            }
+        }
+        LOG.info("create table {}, schema {}", getFullTableName(options, 
tableId), schema);
+        tableCreator.create();
+    }
+
+    /**
+     * equals to run a sql like: 'alter table table_name add columns 
(col_name1 type1 comment [,
+     * col_name2 type2 ...]);'.
+     */
+    public static void addColumns(
+            MaxComputeOptions options,
+            TableId tableId,
+            List<AddColumnEvent.ColumnWithPosition> columns)
+            throws OdpsException {
+        Odps odps = MaxComputeUtils.getOdps(options);
+
+        StringBuilder sqlBuilder =
+                new StringBuilder(
+                        "alter table " + getFullTableName(options, tableId) + 
" add columns (");
+
+        for (AddColumnEvent.ColumnWithPosition addColumn : columns) {
+            if (addColumn.getPosition() == AddColumnEvent.ColumnPosition.LAST) 
{
+                sqlBuilder
+                        .append(addColumn.getAddColumn().getName())
+                        .append(" ")
+                        .append(string(addColumn.getAddColumn().getType()))
+                        .append(" comment '")
+                        
.append(addColumn.getAddColumn().getType().asSummaryString())
+                        .append("',");
+            } else {
+                throw new UnsupportedOperationException(
+                        "Not support position: "
+                                + addColumn.getPosition()
+                                + " "
+                                + addColumn.getExistedColumnName());
+            }
+        }
+        // remove ','
+        sqlBuilder.deleteCharAt(sqlBuilder.length() - 1);
+        sqlBuilder.append(");");
+
+        Instance instance =
+                SQLTask.run(
+                        odps,
+                        odps.getDefaultProject(),
+                        sqlBuilder.toString(),
+                        options.isSupportSchema() ? supportSchemaHints : 
unsupportSchemahints,
+                        null);
+        LOG.info("execute add column task: `{}`, instanceId: {}", sqlBuilder, 
instance.getId());
+        instance.waitForSuccess();
+    }
+
+    /**
+     * equals to run a sql like: 'alter table table_name change column 
old_column_name
+     * new_column_name new_data_type;'. and 'alter table table_name change 
column col_name comment
+     * 'col_comment'';
+     */
+    public static void alterColumnType(
+            MaxComputeOptions options, TableId tableId, Map<String, DataType> 
typeMapping)
+            throws OdpsException {

Review Comment:
   What kind of column type change can we support, can user get clear error 
message if a type change is not supported?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeMetadataApplier.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import 
org.apache.flink.cdc.connectors.maxcompute.common.UncheckedOdpsException;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils;
+import org.apache.flink.cdc.connectors.maxcompute.utils.SchemaEvolutionUtils;
+import org.apache.flink.cdc.connectors.maxcompute.utils.TypeConvertUtils;
+
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.TableSchema;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link MetadataApplier} for "MaxCompute" connector. */
+public class MaxComputeMetadataApplier implements MetadataApplier {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(MaxComputeMetadataApplier.class);
+
+    private final MaxComputeOptions maxComputeOptions;
+
+    public MaxComputeMetadataApplier(MaxComputeOptions maxComputeOptions) {
+        this.maxComputeOptions = maxComputeOptions;
+    }
+
+    @Override
+    public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
+        LOG.info("MaxCompute apply schema change event: {}", 
schemaChangeEvent);
+        try {
+            if (schemaChangeEvent instanceof CreateTableEvent) {
+                CreateTableEvent createTableEvent = (CreateTableEvent) 
schemaChangeEvent;
+                if (MaxComputeUtils.isTableExist(maxComputeOptions, 
createTableEvent.tableId())) {
+                    Table table =
+                            MaxComputeUtils.getTable(maxComputeOptions, 
createTableEvent.tableId());
+                    TableSchema expectSchema =
+                            
TypeConvertUtils.toMaxCompute(createTableEvent.getSchema());
+                    if (!MaxComputeUtils.schemaEquals(table.getSchema(), 
expectSchema)) {
+                        throw new IllegalStateException(
+                                "The schema of create table event is not 
equals to exist table schema, please drop/rename exist table before flink cdc 
task start.");
+                    }
+                    if (!CollectionUtils.isEqualCollection(
+                            createTableEvent.getSchema().primaryKeys(), 
table.getPrimaryKey())) {
+                        throw new IllegalStateException(
+                                "The primary key of create table event is not 
equals to exist table primary key, please drop/rename exist table before flink 
cdc task start.");
+                    }
+                } else {
+                    SchemaEvolutionUtils.createTable(
+                            maxComputeOptions,
+                            createTableEvent.tableId(),
+                            createTableEvent.getSchema());
+                }
+            } else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
+                AlterColumnTypeEvent alterColumnTypeEvent =
+                        (AlterColumnTypeEvent) schemaChangeEvent;
+                SchemaEvolutionUtils.alterColumnType(
+                        maxComputeOptions,
+                        alterColumnTypeEvent.tableId(),
+                        alterColumnTypeEvent.getTypeMapping());
+            } else if (schemaChangeEvent instanceof DropColumnEvent) {
+                DropColumnEvent dropColumnEvent = (DropColumnEvent) 
schemaChangeEvent;
+                SchemaEvolutionUtils.dropColumn(
+                        maxComputeOptions,
+                        dropColumnEvent.tableId(),
+                        dropColumnEvent.getDroppedColumnNames());
+            } else if (schemaChangeEvent instanceof RenameColumnEvent) {
+                RenameColumnEvent renameColumnEvent = (RenameColumnEvent) 
schemaChangeEvent;
+                SchemaEvolutionUtils.renameColumn(
+                        maxComputeOptions,
+                        renameColumnEvent.tableId(),
+                        renameColumnEvent.getNameMapping());
+            } else if (schemaChangeEvent instanceof AddColumnEvent) {
+                AddColumnEvent addColumnEvent = (AddColumnEvent) 
schemaChangeEvent;
+                SchemaEvolutionUtils.addColumns(
+                        maxComputeOptions,
+                        addColumnEvent.tableId(),
+                        addColumnEvent.getAddedColumns());
+            } else {
+                throw new UnsupportedOperationException(

Review Comment:
   Do we plan to support DropTableEvent and TruncateTableEvent.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtilsTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.utils;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.connectors.maxcompute.EmulatorTestBase;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.TableSchema;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * e2e test of SchemaEvolutionUtils, Note that the Emulator only supports 
uppercase input (However,
+ * MaxCompute can correctly distinguish between uppercase and lowercase).
+ *
+ * <p>Since the emulator does not support alter column type, the test cases 
here are mainly for
+ * testing other schema evolution logic.
+ */
+class SchemaEvolutionUtilsTest extends EmulatorTestBase {

Review Comment:
   We can add a test for alterColumnType method.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageOperator.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.coordinator;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionRequest;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionResponse;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.message.WaitForFlushSuccessRequest;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils;
+import org.apache.flink.cdc.connectors.maxcompute.utils.TypeConvertUtils;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
+import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.SerializedValue;
+
+import com.aliyun.odps.PartitionSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Processes a {@link DataChangeEvent}, extracting data and encapsulating it 
into a {@link
+ * SessionIdentifier}, and then sends a {@link CreateSessionRequest} to the 
{@link
+ * SessionManageCoordinator} to create a writing session. Subsequently, it 
incorporates the
+ * SessionId into the metadata of the {@link DataChangeEvent} for downstream 
processing.
+ */
+public class SessionManageOperator extends AbstractStreamOperator<Event>
+        implements OneInputStreamOperator<Event, Event>, OperatorEventHandler, 
BoundedOneInput {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(SessionManageOperator.class);
+
+    /** a tricky way to get an Operator from sink. */
+    public static SessionManageOperator instance;

Review Comment:
   We should add `todo` to note that we can try to remove it in the future. 



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/MaxComputeUtils.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.utils;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.utils.StringUtils;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier;
+import 
org.apache.flink.cdc.connectors.maxcompute.common.UncheckedOdpsException;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import 
org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.account.StsAccount;
+import com.aliyun.odps.tunnel.Configuration;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.io.CompressOption;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/** common utils use for maxcompute connector. */
+public class MaxComputeUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MaxComputeUtils.class);
+
+    public static Odps getOdps(MaxComputeOptions maxComputeOptions) {
+        Account account;
+        if 
(StringUtils.isNullOrWhitespaceOnly(maxComputeOptions.getStsToken())) {
+            account =
+                    new AliyunAccount(
+                            maxComputeOptions.getAccessId(), 
maxComputeOptions.getAccessKey());
+        } else {
+            account =
+                    new StsAccount(
+                            maxComputeOptions.getAccessId(),
+                            maxComputeOptions.getAccessKey(),
+                            maxComputeOptions.getStsToken());
+        }
+        Odps odps = new Odps(account);
+        odps.setEndpoint(maxComputeOptions.getEndpoint());
+        odps.setTunnelEndpoint(maxComputeOptions.getTunnelEndpoint());
+        odps.setDefaultProject(maxComputeOptions.getProject());
+        odps.setUserAgent("Flink CDC");
+        return odps;
+    }
+
+    public static TableTunnel getTunnel(
+            MaxComputeOptions maxComputeOptions, MaxComputeWriteOptions 
writeOptions) {
+        Odps odps = getOdps(maxComputeOptions);
+        Configuration configuration =
+                Configuration.builder(odps)
+                        .withRetryLogger(RetryUtils.getRetryLogger())
+                        .withRetryPolicy(new 
RetryUtils.FlinkDefaultRetryPolicy())
+                        .withCompressOptions(
+                                MaxComputeUtils.compressOptionOf(
+                                        writeOptions.getCompressAlgorithm()))
+                        .withQuotaName(maxComputeOptions.getQuotaName())
+                        .build();
+        TableTunnel tunnel = new TableTunnel(odps, configuration);
+        if 
(!StringUtils.isNullOrWhitespaceOnly(maxComputeOptions.getTunnelEndpoint())) {
+            tunnel.setEndpoint(maxComputeOptions.getTunnelEndpoint());
+        }
+        return tunnel;
+    }
+
+    public static Table getTable(MaxComputeOptions maxComputeOptions, TableId 
tableId) {
+        Odps odps = getOdps(maxComputeOptions);
+        if (maxComputeOptions.isSupportSchema()) {
+            return odps.tables()
+                    .get(
+                            maxComputeOptions.getProject(),
+                            tableId.getNamespace(),
+                            tableId.getTableName());
+        } else {
+            return odps.tables().get(tableId.getTableName());
+        }
+    }
+
+    public static TableSchema getTableSchema(MaxComputeOptions options, 
TableId tableId) {
+        Odps odps = getOdps(options);
+        if (options.isSupportSchema()) {
+            return odps.tables()
+                    .get(options.getProject(), tableId.getNamespace(), 
tableId.getTableName())
+                    .getSchema();
+        } else {
+            return odps.tables().get(options.getProject(), 
tableId.getTableName()).getSchema();
+        }
+    }
+
+    public static boolean supportSchema(MaxComputeOptions maxComputeOptions) {
+        Odps odps = getOdps(maxComputeOptions);
+        try {
+            boolean flag =
+                    Boolean.parseBoolean(
+                            
odps.projects().get().getProperty(Constant.SCHEMA_ENABLE_FLAG));
+            LOG.info("project {} is support schema: {}", 
maxComputeOptions.getProject(), flag);
+            return flag;
+        } catch (OdpsException e) {
+            throw new UncheckedOdpsException(e);
+        }
+    }
+
+    public static CompressOption compressOptionOf(String compressAlgo) {
+        CompressOption.CompressAlgorithm compressAlgorithm;
+        switch (compressAlgo) {

Review Comment:
   What about using CompressOption.CompressAlgorithm.valueOf() to simplify this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to