This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d705dcc4188 [HUDI-5173] Skip if there is only one file in 
clusteringGroup  (#7159)
d705dcc4188 is described below

commit d705dcc4188223fbd824f36a5d211abeda7b1f23
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Fri Feb 24 10:23:25 2023 +0800

    [HUDI-5173] Skip if there is only one file in clusteringGroup  (#7159)
    
    Introduce a new option 
'hoodie.clustering.plan.strategy.single.group.clustering.enabled' to allow 
disabling single file group clustering, when the clustering sort is also 
disabled, clustering single file group is unnecessary and can cause unnecessary 
read/write costs.
---
 .../apache/hudi/config/HoodieClusteringConfig.java | 11 +++
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 12 +++
 .../FlinkSizeBasedClusteringPlanStrategy.java      | 32 ++++++--
 .../TestFlinkSizeBasedClusteringPlanStrategy.java  | 96 ++++++++++++++++++++++
 .../SparkSizeBasedClusteringPlanStrategy.java      | 10 ++-
 ...TestSparkBuildClusteringGroupsForPartition.java | 93 +++++++++++++++++++++
 .../realtime/TestHoodieRealtimeRecordReader.java   |  1 +
 7 files changed, 243 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index bfcd4315d29..b76a66d91c5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -182,6 +182,12 @@ public class HoodieClusteringConfig extends HoodieConfig {
       .sinceVersion("0.7.0")
       .withDocumentation("Each group can produce 'N' 
(CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups");
 
+  public static final ConfigProperty<Boolean> 
PLAN_STRATEGY_SINGLE_GROUP_CLUSTERING_ENABLED = ConfigProperty
+      .key(CLUSTERING_STRATEGY_PARAM_PREFIX + 
".single.group.clustering.enabled")
+      .defaultValue(true)
+      .sinceVersion("0.14.0")
+      .withDocumentation("Whether to generate clustering plan when there is 
only one file group involved, by default true");
+
   public static final ConfigProperty<String> PLAN_STRATEGY_SORT_COLUMNS = 
ConfigProperty
       .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "sort.columns")
       .noDefaultValue()
@@ -469,6 +475,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withSingleGroupClusteringEnabled(Boolean enabled) {
+      clusteringConfig.setValue(PLAN_STRATEGY_SINGLE_GROUP_CLUSTERING_ENABLED, 
String.valueOf(enabled));
+      return this;
+    }
+
     public Builder 
withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode mode) {
       clusteringConfig.setValue(PLAN_PARTITION_FILTER_MODE_NAME.key(), 
mode.toString());
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index f82ac90c424..2ccd0435d3a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1601,10 +1601,22 @@ public class HoodieWriteConfig extends HoodieConfig {
     return 
getInt(HoodieClusteringConfig.PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST);
   }
 
+  public boolean isSingleGroupClusteringEnabled() {
+    return 
getBoolean(HoodieClusteringConfig.PLAN_STRATEGY_SINGLE_GROUP_CLUSTERING_ENABLED);
+  }
+
+  public boolean shouldClusteringSingleGroup() {
+    return isClusteringSortEnabled() || isSingleGroupClusteringEnabled();
+  }
+
   public String getClusteringSortColumns() {
     return getString(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS);
   }
 
+  public boolean isClusteringSortEnabled() {
+    return 
!StringUtils.isNullOrEmpty(getString(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS));
+  }
+
   public HoodieClusteringConfig.LayoutOptimizationStrategy 
getLayoutOptimizationStrategy() {
     return HoodieClusteringConfig.LayoutOptimizationStrategy.fromValue(
         getStringOrDefault(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY)
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
index 3abffe38d8b..ac320ceefe6 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
@@ -63,15 +63,23 @@ public class FlinkSizeBasedClusteringPlanStrategy<T>
 
     List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
     List<FileSlice> currentGroup = new ArrayList<>();
+
+    // Sort fileSlices before dividing, which makes dividing more compact
+    List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
+    sortedFileSlices.sort((o1, o2) -> (int)
+        ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() 
: writeConfig.getParquetMaxFileSize())
+            - (o1.getBaseFile().isPresent() ? 
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+
     long totalSizeSoFar = 0;
 
-    for (FileSlice currentSlice : fileSlices) {
+    for (FileSlice currentSlice : sortedFileSlices) {
+      long currentSize = currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
       // check if max size is reached and create new group, if needed.
-      // in now, every clustering group out put is 1 file group.
-      if (totalSizeSoFar >= writeConfig.getClusteringTargetFileMaxBytes() && 
!currentGroup.isEmpty()) {
+      if (totalSizeSoFar + currentSize > 
writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
+        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
         LOG.info("Adding one clustering group " + totalSizeSoFar + " max 
bytes: "
-            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size());
-        fileSliceGroups.add(Pair.of(currentGroup, 1));
+            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
+        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
         currentGroup = new ArrayList<>();
         totalSizeSoFar = 0;
       }
@@ -79,11 +87,16 @@ public class FlinkSizeBasedClusteringPlanStrategy<T>
       // Add to the current file-group
       currentGroup.add(currentSlice);
       // assume each file group size is ~= parquet.max.file.size
-      totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
+      totalSizeSoFar += currentSize;
     }
 
     if (!currentGroup.isEmpty()) {
-      fileSliceGroups.add(Pair.of(currentGroup, 1));
+      if (currentGroup.size() > 1 || 
writeConfig.shouldClusteringSingleGroup()) {
+        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
+        LOG.info("Adding final clustering group " + totalSizeSoFar + " max 
bytes: "
+            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
+        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+      }
     }
 
     return fileSliceGroups.stream().map(fileSliceGroup ->
@@ -106,8 +119,11 @@ public class FlinkSizeBasedClusteringPlanStrategy<T>
   @Override
   protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String 
partition) {
     return super.getFileSlicesEligibleForClustering(partition)
-        // Only files that have basefile size smaller than small file size are 
eligible.
+        // Only files that have base file size smaller than small file size 
are eligible.
         .filter(slice -> 
slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < 
getWriteConfig().getClusteringSmallFileLimit());
   }
 
+  private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) 
{
+    return (int) Math.ceil(groupSize / (double) targetFileSize);
+  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestFlinkSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestFlinkSizeBasedClusteringPlanStrategy.java
new file mode 100644
index 00000000000..97f12abf322
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestFlinkSizeBasedClusteringPlanStrategy.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import 
org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test cases for {@link FlinkSizeBasedClusteringPlanStrategy}.
+ */
+public class TestFlinkSizeBasedClusteringPlanStrategy {
+
+  @Mock
+  HoodieFlinkCopyOnWriteTable table;
+  @Mock
+  HoodieFlinkEngineContext context;
+  HoodieWriteConfig.Builder hoodieWriteConfigBuilder;
+
+  @BeforeEach
+  public void setUp() {
+    this.hoodieWriteConfigBuilder = HoodieWriteConfig
+            .newBuilder()
+            .withPath("path1");
+  }
+
+  @Test
+  public void testBuildClusteringGroupsForPartitionOnlyOneFile() {
+    String partition = "20221117";
+    String fileId = "fg-1";
+    List<FileSlice> fileSliceGroups = new ArrayList<>();
+    fileSliceGroups.add(generateFileSlice(partition, fileId, "0"));
+    // test buildClusteringGroupsForPartition with ClusteringSortColumns config
+    HoodieWriteConfig configWithSortEnabled = 
hoodieWriteConfigBuilder.withClusteringConfig(
+        HoodieClusteringConfig.newBuilder()
+          
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.NONE)
+          .withSingleGroupClusteringEnabled(false)
+          .withClusteringSortColumns("f0")
+          .build())
+        .build();
+    PartitionAwareClusteringPlanStrategy strategyWithSortEnabled = new 
FlinkSizeBasedClusteringPlanStrategy(table, context, configWithSortEnabled);
+    Stream<HoodieClusteringGroup> groupStreamSort = 
strategyWithSortEnabled.buildClusteringGroupsForPartition(partition,fileSliceGroups);
+    assertEquals(1, groupStreamSort.count());
+
+    // test buildClusteringGroupsForPartition without ClusteringSortColumns 
config
+    HoodieWriteConfig configWithSortDisabled = 
hoodieWriteConfigBuilder.withClusteringConfig(
+        HoodieClusteringConfig.newBuilder()
+          
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.NONE)
+          .withSingleGroupClusteringEnabled(false)
+          .withClusteringSortColumns("")
+          .build())
+        .build();
+    PartitionAwareClusteringPlanStrategy strategyWithSortDisabled = new 
FlinkSizeBasedClusteringPlanStrategy(table, context, configWithSortDisabled);
+    Stream<HoodieClusteringGroup> groupStreamWithOutSort = 
strategyWithSortDisabled.buildClusteringGroupsForPartition(partition,fileSliceGroups);
+    assertEquals(0, groupStreamWithOutSort.count());
+  }
+
+  private FileSlice generateFileSlice(String partitionPath, String fileId, 
String baseInstant) {
+    FileSlice fs = new FileSlice(new HoodieFileGroupId(partitionPath, fileId), 
baseInstant);
+    fs.setBaseFile(new HoodieBaseFile(FSUtils.makeBaseFileName(baseInstant, 
"1-0-1", fileId)));
+    return fs;
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
index fe6d7c2efc2..d2fa89ec772 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
@@ -92,10 +92,12 @@ public class SparkSizeBasedClusteringPlanStrategy<T>
     }
 
     if (!currentGroup.isEmpty()) {
-      int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
-      LOG.info("Adding final clustering group " + totalSizeSoFar + " max 
bytes: "
-          + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " 
+ currentGroup.size() + " output groups: " + numOutputGroups);
-      fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+      if (currentGroup.size() > 1 || 
writeConfig.shouldClusteringSingleGroup()) {
+        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
+        LOG.info("Adding final clustering group " + totalSizeSoFar + " max 
bytes: "
+            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
+        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+      }
     }
 
     return fileSliceGroups.stream().map(fileSliceGroup ->
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkBuildClusteringGroupsForPartition.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkBuildClusteringGroupsForPartition.java
new file mode 100644
index 00000000000..d12761012c4
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkBuildClusteringGroupsForPartition.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import 
org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestSparkBuildClusteringGroupsForPartition {
+  @Mock
+  HoodieSparkCopyOnWriteTable table;
+  @Mock
+  HoodieSparkEngineContext context;
+  HoodieWriteConfig.Builder hoodieWriteConfigBuilder;
+
+  @BeforeEach
+  public void setUp() {
+    this.hoodieWriteConfigBuilder = HoodieWriteConfig
+      .newBuilder()
+      .withPath("path1");
+  }
+
+  @Test
+  public void testBuildClusteringGroupsForPartitionOnlyOneFile() {
+    String partition = "20221117";
+    String fileId = "fg-1";
+    List<FileSlice> fileSliceGroups = new ArrayList<>();
+    fileSliceGroups.add(generateFileSlice(partition, fileId, "0"));
+    // test buildClusteringGroupsForPartition with ClusteringSortColumns config
+    HoodieWriteConfig configWithSortEnabled = 
hoodieWriteConfigBuilder.withClusteringConfig(
+        HoodieClusteringConfig.newBuilder()
+          
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.NONE)
+          .withSingleGroupClusteringEnabled(false)
+          .withClusteringSortColumns("f0")
+          .build())
+        .build();
+    PartitionAwareClusteringPlanStrategy strategyWithSortEnabled = new 
SparkSizeBasedClusteringPlanStrategy(table, context, configWithSortEnabled);
+    Stream<HoodieClusteringGroup> groupStreamSort = 
strategyWithSortEnabled.buildClusteringGroupsForPartition(partition,fileSliceGroups);
+    assertEquals(1, groupStreamSort.count());
+
+    // test buildClusteringGroupsForPartition without ClusteringSortColumns 
config
+    HoodieWriteConfig configWithSortDisabled = 
hoodieWriteConfigBuilder.withClusteringConfig(
+        HoodieClusteringConfig.newBuilder()
+          
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.NONE)
+          .withSingleGroupClusteringEnabled(false)
+          .withClusteringSortColumns("")
+          .build())
+        .build();
+    PartitionAwareClusteringPlanStrategy strategyWithSortDisabled = new 
SparkSizeBasedClusteringPlanStrategy(table, context, configWithSortDisabled);
+    Stream<HoodieClusteringGroup> groupStreamWithOutSort = 
strategyWithSortDisabled.buildClusteringGroupsForPartition(partition,fileSliceGroups);
+    assertEquals(0, groupStreamWithOutSort.count());
+  }
+
+  private FileSlice generateFileSlice(String partitionPath, String fileId, 
String baseInstant) {
+    FileSlice fs = new FileSlice(new HoodieFileGroupId(partitionPath, fileId), 
baseInstant);
+    fs.setBaseFile(new HoodieBaseFile(FSUtils.makeBaseFileName(baseInstant, 
"1-0-1", fileId)));
+    return fs;
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index f334bbf3bc9..6c530833d55 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -686,6 +686,7 @@ public class TestHoodieRealtimeRecordReader {
       JobConf newJobConf = new JobConf(baseJobConf);
       List<Schema.Field> fields = schema.getFields();
       setHiveColumnNameProps(fields, newJobConf, false);
+      newJobConf.set("columns.types", 
"string,string,string,string,string,string,string,string,bigint,string,string");
       RecordReader<NullWritable, ArrayWritable> reader  = 
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
       // use reader to read log file.
       NullWritable key = reader.createKey();

Reply via email to