EMsnap commented on code in PR #8096: URL: https://github.com/apache/inlong/pull/8096#discussion_r1209088899
########## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOutputFormatFactory.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hive; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.table.filesystem.OutputFormatFactory; +import org.apache.flink.types.Row; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.io.Writable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.function.Function; + +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE; + +/** Hive {@link OutputFormatFactory}, use {@link RecordWriter} to write record. */ +public class HiveOutputFormatFactory implements OutputFormatFactory<Row> { + + private static final long serialVersionUID = 2L; + + private final HashMap<Path, HiveWriterFactory> factoryMap = new HashMap<>(16); + + private final HiveWriterFactory factory; + + private final boolean sinkMultipleEnable; Review Comment: parameter not used ########## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/HadoopRenameFileCommitter.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hive.filesystem; + +import org.apache.inlong.sort.hive.util.CacheHolder; + +import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The Hadoop file committer that directly rename the in-progress file to the target file. For + * FileSystem like S3, renaming may lead to additional copies. + */ +public class HadoopRenameFileCommitter implements HadoopFileCommitter { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopRenameFileCommitter.class); + + private final Configuration configuration; + + private Path targetFilePath; + + private Path tempFilePath; + + private boolean sinkMultipleEnable; + + public HadoopRenameFileCommitter(Configuration configuration, + Path targetFilePath, + boolean sinkMultipleEnable) + throws IOException { + this.configuration = configuration; + this.targetFilePath = targetFilePath; + this.tempFilePath = generateTempFilePath(); + this.sinkMultipleEnable = sinkMultipleEnable; + } + + public HadoopRenameFileCommitter(Configuration configuration, + Path targetFilePath, + Path inProgressPath, + boolean sinkMultipleEnable) { + this.configuration = configuration; + this.targetFilePath = targetFilePath; + this.tempFilePath = inProgressPath; + this.sinkMultipleEnable = sinkMultipleEnable; + } + + @Override + public Path getTargetFilePath() { + return targetFilePath; + } + + @Override + public Path getTempFilePath() { + return tempFilePath; + } + + public Configuration getConfiguration() { + return configuration; + } + + @Override + public void preCommit() { + // Do nothing. + } + + @Override + public void commit() throws IOException { + if (sinkMultipleEnable) { + commitMultiple(true); + } else { + rename(true); + } + } + + @Override + public void commitAfterRecovery() throws IOException { + if (sinkMultipleEnable) { + commitMultiple(false); + } else { + rename(false); + } + } + + private void commitMultiple(boolean assertFileExists) throws IOException { + LOG.info("file committer cache {}", CacheHolder.getFileCommitterHashMap()); + Iterator<Path> iterator = CacheHolder.getFileCommitterHashMap().keySet().iterator(); + while (iterator.hasNext()) { + Path path = iterator.next(); + if (path.getName().equals(tempFilePath.getName())) { + HadoopRenameFileCommitter committer = CacheHolder.getFileCommitterHashMap().get(path); + committer.rename(assertFileExists); + iterator.remove(); + } + } + } + + private void rename(boolean assertFileExists) throws IOException { + FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration); + + if (!fileSystem.exists(tempFilePath)) { + if (assertFileExists) { + throw new IOException( + String.format("In progress file(%s) not exists.", tempFilePath)); + } else { + // By pass the re-commit if source file not exists. + // TODO: in the future we may also need to check if the target file exists. + return; + } + } + + try { + // If file exists, it will be overwritten. + fileSystem.rename(tempFilePath, targetFilePath); + } catch (IOException e) { + throw new IOException( + String.format( + "Could not commit file from %s to %s", tempFilePath, targetFilePath), + e); + } + } + + private Path generateTempFilePath() throws IOException { + checkArgument(targetFilePath.isAbsolute(), "Target file must be absolute"); + + FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration); + + Path parent = targetFilePath.getParent(); + String name = targetFilePath.getName(); + + while (true) { + Path candidate = + new Path(parent, "." + name + ".inprogress." + UUID.randomUUID().toString()); Review Comment: Unnecessary 'toString()' call ########## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveOutputFormatFactory.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hive; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.table.filesystem.OutputFormatFactory; +import org.apache.flink.types.Row; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.io.Writable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.function.Function; + +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE; + +/** Hive {@link OutputFormatFactory}, use {@link RecordWriter} to write record. */ +public class HiveOutputFormatFactory implements OutputFormatFactory<Row> { + + private static final long serialVersionUID = 2L; + + private final HashMap<Path, HiveWriterFactory> factoryMap = new HashMap<>(16); Review Comment: parameter not used ########## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/PartitionCommitter.java: ########## @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hive.filesystem; + +import org.apache.inlong.sort.hive.HiveTableMetaStoreFactory; +import org.apache.inlong.sort.hive.table.HiveTableInlongFactory; + +import org.apache.commons.lang.time.StopWatch; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory; +import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; +import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; +import org.apache.flink.table.filesystem.EmptyMetaStoreFactory; +import org.apache.flink.table.filesystem.FileSystemFactory; +import org.apache.flink.table.filesystem.MetastoreCommitPolicy; +import org.apache.flink.table.filesystem.PartitionCommitPolicy; +import org.apache.flink.table.filesystem.SuccessFileCommitPolicy; +import org.apache.flink.table.filesystem.TableMetaStoreFactory; +import org.apache.flink.table.filesystem.stream.PartitionCommitInfo; +import org.apache.flink.table.filesystem.stream.PartitionCommitTrigger; +import org.apache.flink.table.filesystem.stream.TaskTracker; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_CLASS; +import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND; +import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME; +import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath; +import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE; + +/** + * Committer operator for partitions. This is the single (non-parallel) task. It collects all the + * partition information sent from upstream, and triggers the partition submission decision when it + * judges to collect the partitions from all tasks of a checkpoint. + * + * <p>NOTE: It processes records after the checkpoint completes successfully. Receive records from + * upstream {@link CheckpointListener#notifyCheckpointComplete}. + * + * <p>Processing steps: 1.Partitions are sent from upstream. Add partition to trigger. 2.{@link + * TaskTracker} say it have already received partition data from all tasks in a checkpoint. + * 3.Extracting committable partitions from {@link PartitionCommitTrigger}. 4.Using {@link + * PartitionCommitPolicy} chain to commit partitions. + */ +public class PartitionCommitter extends AbstractStreamOperator<Void> + implements + OneInputStreamOperator<PartitionCommitInfo, Void> { + + /** + * hdfs files which modified less than HDFS_FILE_MODIFIED_THRESHOLD will be committed partitions + */ + private static final long HDFS_FILES_MODIFIED_THRESHOLD = 5 * 60 * 1000L; + + private static final long serialVersionUID = 1L; + + private final Configuration conf; + + private Path locationPath; + + private final ObjectIdentifier tableIdentifier; + + private final List<String> partitionKeys; + + private final TableMetaStoreFactory metaStoreFactory; + + private final FileSystemFactory fsFactory; + + private transient PartitionCommitTrigger trigger; + + private transient TaskTracker taskTracker; + + private transient long currentWatermark; + + private transient List<PartitionCommitPolicy> policies; + + private final HiveShim hiveShim; Review Comment: this field is never used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
