dybyte commented on code in PR #9462: URL: https://github.com/apache/seatunnel/pull/9462#discussion_r2160874496
########## seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeOutputFormat.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.util; + +import org.apache.seatunnel.shade.com.google.common.util.concurrent.Striped; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions; + +import com.aliyun.odps.PartitionSpec; +import com.aliyun.odps.TableSchema; +import com.aliyun.odps.data.ArrayRecord; +import com.aliyun.odps.data.Record; +import com.aliyun.odps.data.RecordWriter; +import com.aliyun.odps.tunnel.TableTunnel; +import com.aliyun.odps.tunnel.TunnelException; +import com.aliyun.odps.tunnel.streams.UpsertStream; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.locks.Lock; + +@Slf4j +public class MaxcomputeOutputFormat { + private static final int MIN_LOCK_COUNT = 16; + private static final int MAX_LOCK_COUNT = 2048; + private final Striped<Lock> stripedLocks; + private final PrimaryKey primaryKey; + + private final ReadonlyConfig readonlyConfig; + + private final TableSchema tableSchema; + private final SeaTunnelRowType rowType; + private final FormatterContext formatterContext; + private final String tunnelEndPoint; + + private RecordWriter recordWriter; + private UpsertStream upsertStream; + private TableTunnel.UploadSession uploadSession; + private TableTunnel.UpsertSession upsertSession; + + public MaxcomputeOutputFormat( + SeaTunnelRowType rowType, + ReadonlyConfig readonlyConfig, + TableSchema tableSchema, + FormatterContext formatterContext, + String tunnelEndPoint, + PrimaryKey primaryKey, + int lockCount) { + this.rowType = rowType; + this.readonlyConfig = readonlyConfig; + this.tableSchema = tableSchema; + this.formatterContext = formatterContext; + this.tunnelEndPoint = tunnelEndPoint; + this.primaryKey = primaryKey; + int stripes = validateLockCount(lockCount); + this.stripedLocks = Striped.lock(stripes); + } + + public void write(SeaTunnelRow seaTunnelRow) throws IOException, TunnelException { + switch (seaTunnelRow.getRowKind()) { + case INSERT: + insertRecord(seaTunnelRow); + break; + case UPDATE_AFTER: + upsertRecord(seaTunnelRow); + break; + case DELETE: + deleteRecord(seaTunnelRow); + break; + default: + throw CommonError.unsupportedDataType( + MaxcomputeBaseOptions.PLUGIN_NAME, + seaTunnelRow.getRowKind().toString(), + seaTunnelRow.toString()); + } + } + + public void close() throws IOException, TunnelException { + closeUploadSession(); + closeUpsertSession(); + } + + private void closeUploadSession() throws IOException, TunnelException { + if (recordWriter != null) { + try { + recordWriter.close(); + } finally { + recordWriter = null; + } + } + if (uploadSession != null) { + uploadSession.commit(); + } + } + + private void closeUpsertSession() throws IOException, TunnelException { + if (upsertStream != null) { + try { + upsertStream.flush(); + upsertStream.close(); + } finally { + upsertStream = null; + } + } + + if (upsertSession != null) { + try { + upsertSession.commit(true); + } finally { + upsertSession.close(); + upsertSession = null; + } + } + } + + int validateLockCount(int inputCount) { + if (inputCount < MIN_LOCK_COUNT) { + return MIN_LOCK_COUNT; + } + if (inputCount > MAX_LOCK_COUNT) { + return MAX_LOCK_COUNT; + } + return inputCount; + } + + private void insertRecord(SeaTunnelRow seaTunnelRow) throws TunnelException, IOException { + ensureInsertSessionAndWriter(); + Record arrayRecord = + MaxcomputeTypeMapper.getMaxcomputeRowData( + new ArrayRecord(tableSchema), + seaTunnelRow, + this.tableSchema, + this.rowType, + formatterContext); + recordWriter.write(arrayRecord); + } + + private void upsertRecord(SeaTunnelRow seaTunnelRow) throws TunnelException, IOException { + ensureUpsertSessionAndWriter(); + Record upsertRecord = + MaxcomputeTypeMapper.getMaxcomputeRowData( + upsertSession.newRecord(), + seaTunnelRow, + this.tableSchema, + this.rowType, + formatterContext); + + lockProcess(seaTunnelRow, () -> upsertStream.upsert(upsertRecord)); Review Comment: I might have misunderstood and added a PK-based lock for this part. The reason I considered adding a PK-based lock is because of the note in the official documentation: > "Due to the writing characteristics of the primary key table, we should carefully control the writing logic when writing to the same table (partition) concurrently. If there are multiple concurrent writes to the same primary key at the same time, unexpected behavior may occur. A common solution is to use the shuffle by PK operation to assign records with the same primary key to the same thread for writing." After re-checking the `org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter` class, I found that each parallelism indeed processes records in a single pipeline and also implements PK-based partitioning using the primary key’s hash to ensure that records with the same PK go to the same queue/thread. Therefore, my additional lock is redundant. I will remove the lock logic accordingly. Thanks for pointing this out and sorry for the confusion! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
