yanghua commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569905107



##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
##########
@@ -58,22 +59,24 @@
 
   private static final Map<String, String> EXPECTED = new HashMap<>();

Review comment:
       `EXPECTED1`?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -250,4 +260,37 @@ public static void checkRequiredProperties(TypedProperties 
props, List<String> c
     checkPropNames.forEach(prop ->
         Preconditions.checkState(!props.containsKey(prop), "Required property 
" + prop + " is missing"));
   }
+
+  /**
+   * Initialize the table if it does not exist.
+   *
+   * @param conf the configuration
+   * @throws IOException if errors happens when writing metadata
+   */
+  public static void initTable(Configuration conf) throws IOException {

Review comment:
       This method is called in many places in one flink job. Do we really call 
it many times?
   
   If yes, from sematics, rename it to be `initTableIfNotExist` or something 
like `initTableOnDemand`?

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/BucketAssignerTest.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.table.action.commit.SmallFile;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/** Test cases for {@link BucketAssigner}. */

Review comment:
       Align with the old style of Java's class doc in the project looks better.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
##########
@@ -71,8 +71,7 @@
       + "hoodie client, schema provider, key generator and data source. For 
hoodie client props, sane defaults are "
       + "used, but recommend use to provide basic things like metrics 
endpoints, hive configs etc. For sources, refer"
       + "to individual classes, for supported properties.")
-  public String propsFilePath =
-      "file://" + System.getProperty("user.dir") + 
"/src/test/resources/delta-streamer-config/dfs-source.properties";

Review comment:
       Does this config cause any problem? AFAIK, it copied from deltastreamer.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.transform;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+
+/** Function that transforms RowData to HoodieRecord. */

Review comment:
       Align the comment style.

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/BucketAssignerTest.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.table.action.commit.SmallFile;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/** Test cases for {@link BucketAssigner}. */
+public class BucketAssignerTest {

Review comment:
       If it's a UT, let us follow the naming rule: `TestXXX`?

##########
File path: hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
##########
@@ -92,6 +92,19 @@
           TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
   );
 
+  public static List<RowData> DATA_SET_THREE = Arrays.asList(
+      binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,

Review comment:
       All the same rows? Why not build them via a loop?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignerFunction.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * The function to build the write profile incrementally for records within a 
checkpoint,
+ * it then assigns the bucket with ID using the {@link BucketAssigner}.
+ *
+ * <p>All the records are tagged with HoodieRecordLocation, instead of real 
instant time,
+ * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is 
no need to keep
+ * the "real" instant time for each record, the bucket ID (partition path & 
fileID) actually decides
+ * where the record should write to. The "I" and "U" tag is only used for 
downstream to decide whether
+ * the data bucket is a INSERT or a UPSERT, we should factor the it out when 
the underneath writer
+ * supports specifying the bucket type explicitly.
+ *
+ * <p>The output records should then shuffle by the bucket ID and thus do 
scalable write.
+ *
+ * @see BucketAssigner
+ */
+public class BucketAssignerFunction<K, I, O extends HoodieRecord<?>>

Review comment:
       `BucketAssignFunction ` sounds better?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to