[ 
https://issues.apache.org/jira/browse/HUDI-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386366#comment-17386366
 ] 

ASF GitHub Bot commented on HUDI-2209:
--------------------------------------

yuzhaojing commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r675673937



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Helper class for bulk insert used by Flink.
+ */
+public class BulkInsertWriterHelper {
+
+  private static final Logger LOG = 
LogManager.getLogger(BulkInsertWriterHelper.class);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig writeConfig;
+  private final RowType rowType;
+  private final Boolean arePartitionRecordsSorted;
+  private final List<HoodieInternalWriteStatus> writeStatusList = new 
ArrayList<>();
+  private HoodieRowDataCreateHandle handle;
+  private String lastKnownPartitionPath = null;
+  private final String fileIdPrefix;
+  private int numFilesWritten = 0;
+  private final Map<String, HoodieRowDataCreateHandle> handles = new 
HashMap<>();
+  private final KeyGenerator keyGenerator;
+  private final Schema schema;

Review comment:
       These do not seem to be in use.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Helper class for bulk insert used by Flink.
+ */
+public class BulkInsertWriterHelper {
+
+  private static final Logger LOG = 
LogManager.getLogger(BulkInsertWriterHelper.class);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig writeConfig;
+  private final RowType rowType;
+  private final Boolean arePartitionRecordsSorted;
+  private final List<HoodieInternalWriteStatus> writeStatusList = new 
ArrayList<>();
+  private HoodieRowDataCreateHandle handle;
+  private String lastKnownPartitionPath = null;
+  private final String fileIdPrefix;
+  private int numFilesWritten = 0;
+  private final Map<String, HoodieRowDataCreateHandle> handles = new 
HashMap<>();
+  private final KeyGenerator keyGenerator;
+  private final Schema schema;
+  private final RowDataKeyGen keyGen;
+
+  public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
+                                String instantTime, int taskPartitionId, long 
taskId, long taskEpochId, RowType rowType,
+                                boolean arePartitionRecordsSorted) {
+    this.hoodieTable = hoodieTable;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.rowType = addMetadataFields(rowType); // patch up with metadata fields
+    this.arePartitionRecordsSorted = arePartitionRecordsSorted;
+    this.fileIdPrefix = UUID.randomUUID().toString();
+    try {
+      this.keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(writeConfig.getProps()));
+    } catch (IOException e) {
+      throw new HoodieException("Create key generator error", e);
+    }
+    this.schema = getWriteSchema(writeConfig);
+    this.keyGen = RowDataKeyGen.instance(conf, rowType);
+  }
+
+  /**
+   * Returns the write instant time.
+   */
+  public String getInstantTime() {
+    return this.instantTime;
+  }
+
+  /**
+   * Get the schema of the actual write.
+   */
+  private static Schema getWriteSchema(HoodieWriteConfig config) {
+    return new Schema.Parser().parse(config.getWriteSchema());
+  }
+
+  public void write(RowData record) throws IOException {
+    try {
+      String recordKey = keyGen.getRecordKey(record);
+      String partitionPath = keyGen.getPartitionPath(record);
+
+      if ((lastKnownPartitionPath == null) || 
!lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
+        LOG.info("Creating new file for partition path " + partitionPath);
+        handle = getRowCreateHandle(partitionPath);
+        lastKnownPartitionPath = partitionPath;
+      }
+      handle.write(recordKey, partitionPath, record);
+    } catch (Throwable t) {
+      LOG.error("Global error thrown while trying to write records in 
HoodieRowCreateHandle ", t);
+      throw t;
+    }
+  }
+
+  public List<HoodieInternalWriteStatus> getWriteStatuses() throws IOException 
{
+    close();
+    return writeStatusList;
+  }
+
+  public void abort() {
+  }

Review comment:
       ditto.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Helper class for bulk insert used by Flink.
+ */
+public class BulkInsertWriterHelper {
+
+  private static final Logger LOG = 
LogManager.getLogger(BulkInsertWriterHelper.class);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig writeConfig;
+  private final RowType rowType;
+  private final Boolean arePartitionRecordsSorted;
+  private final List<HoodieInternalWriteStatus> writeStatusList = new 
ArrayList<>();
+  private HoodieRowDataCreateHandle handle;
+  private String lastKnownPartitionPath = null;
+  private final String fileIdPrefix;
+  private int numFilesWritten = 0;
+  private final Map<String, HoodieRowDataCreateHandle> handles = new 
HashMap<>();
+  private final KeyGenerator keyGenerator;
+  private final Schema schema;
+  private final RowDataKeyGen keyGen;
+
+  public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
+                                String instantTime, int taskPartitionId, long 
taskId, long taskEpochId, RowType rowType,
+                                boolean arePartitionRecordsSorted) {
+    this.hoodieTable = hoodieTable;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.rowType = addMetadataFields(rowType); // patch up with metadata fields
+    this.arePartitionRecordsSorted = arePartitionRecordsSorted;
+    this.fileIdPrefix = UUID.randomUUID().toString();
+    try {
+      this.keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(writeConfig.getProps()));
+    } catch (IOException e) {
+      throw new HoodieException("Create key generator error", e);
+    }
+    this.schema = getWriteSchema(writeConfig);
+    this.keyGen = RowDataKeyGen.instance(conf, rowType);
+  }
+
+  /**
+   * Returns the write instant time.
+   */
+  public String getInstantTime() {
+    return this.instantTime;
+  }
+
+  /**
+   * Get the schema of the actual write.
+   */
+  private static Schema getWriteSchema(HoodieWriteConfig config) {
+    return new Schema.Parser().parse(config.getWriteSchema());
+  }
+
+  public void write(RowData record) throws IOException {
+    try {
+      String recordKey = keyGen.getRecordKey(record);
+      String partitionPath = keyGen.getPartitionPath(record);
+
+      if ((lastKnownPartitionPath == null) || 
!lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
+        LOG.info("Creating new file for partition path " + partitionPath);

Review comment:
       It is possible to reuse file here, should move this log when create new 
HoodieRowDataCreateHandle instance?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Bulk insert for flink writer
> ----------------------------
>
>                 Key: HUDI-2209
>                 URL: https://issues.apache.org/jira/browse/HUDI-2209
>             Project: Apache Hudi
>          Issue Type: New Feature
>          Components: Flink Integration
>            Reporter: Danny Chen
>            Assignee: Danny Chen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to