This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 0217061  PARQUET-1951: Allow merge strategies to combine key values 
(#847)
0217061 is described below

commit 02170613d12b5122b07149136382a71505fa39d8
Author: satishkotha <[email protected]>
AuthorDate: Thu Jan 7 01:44:38 2021 -0800

    PARQUET-1951: Allow merge strategies to combine key values (#847)
---
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 39 ++++++++++-
 ...ConcatenatingKeyValueMetadataMergeStrategy.java | 61 +++++++++++++++++
 .../parquet/hadoop/metadata/GlobalMetaData.java    | 28 ++++----
 .../metadata/KeyValueMetadataMergeStrategy.java    | 42 ++++++++++++
 .../StrictKeyValueMetadataMergeStrategy.java       | 42 ++++++++++++
 .../parquet/hadoop/TestParquetFileWriter.java      | 57 ++++++++++++++++
 .../apache/parquet/tools/command/MergeCommand.java | 79 ++++++++++++++++++++--
 7 files changed, 329 insertions(+), 19 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 98699cf..c06e7a8 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -70,8 +70,10 @@ import 
org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.GlobalMetaData;
+import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.apache.parquet.hadoop.util.HadoopStreams;
@@ -1290,6 +1292,22 @@ public class ParquetFileWriter {
    */
   @Deprecated
   public static ParquetMetadata mergeMetadataFiles(List<Path> files,  
Configuration conf) throws IOException {
+    return mergeMetadataFiles(files, conf, new 
StrictKeyValueMetadataMergeStrategy());
+  }
+
+  /**
+   * Given a list of metadata files, merge them into a single ParquetMetadata
+   * Requires that the schemas be compatible, and the extraMetadata be exactly 
equal.
+   * @param files a list of files to merge metadata from
+   * @param conf a configuration
+   * @param keyValueMetadataMergeStrategy strategy to merge values for same 
key, if there are multiple
+   * @return merged parquet metadata for the files
+   * @throws IOException if there is an error while writing
+   * @deprecated metadata files are not recommended and will be removed in 
2.0.0
+   */
+  @Deprecated
+  public static ParquetMetadata mergeMetadataFiles(List<Path> files, 
Configuration conf,
+                                                   
KeyValueMetadataMergeStrategy keyValueMetadataMergeStrategy) throws IOException 
{
     Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list 
of metadata");
 
     GlobalMetaData globalMetaData = null;
@@ -1303,7 +1321,7 @@ public class ParquetFileWriter {
     }
 
     // collapse GlobalMetaData into a single FileMetaData, which will throw if 
they are not compatible
-    return new ParquetMetadata(globalMetaData.merge(), blocks);
+    return new 
ParquetMetadata(globalMetaData.merge(keyValueMetadataMergeStrategy), blocks);
   }
 
   /**
@@ -1384,7 +1402,24 @@ public class ParquetFileWriter {
     metadata.close();
   }
 
+  /**
+   * Will merge the metadata of all the footers together
+   * @param root the directory containing all footers
+   * @param footers the list files footers to merge
+   * @return the global meta data for all the footers
+   */
   static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
+    return mergeFooters(root, footers, new 
StrictKeyValueMetadataMergeStrategy());
+  }
+
+  /**
+   * Will merge the metadata of all the footers together
+   * @param root the directory containing all footers
+   * @param footers the list files footers to merge
+   * @param keyValueMergeStrategy strategy to merge values for a given key (if 
there are multiple values)
+   * @return the global meta data for all the footers
+   */
+  static ParquetMetadata mergeFooters(Path root, List<Footer> footers, 
KeyValueMetadataMergeStrategy keyValueMergeStrategy) {
     String rootPath = root.toUri().getPath();
     GlobalMetaData fileMetaData = null;
     List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
@@ -1403,7 +1438,7 @@ public class ParquetFileWriter {
         blocks.add(block);
       }
     }
-    return new ParquetMetadata(fileMetaData.merge(), blocks);
+    return new ParquetMetadata(fileMetaData.merge(keyValueMergeStrategy), 
blocks);
   }
 
   /**
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ConcatenatingKeyValueMetadataMergeStrategy.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ConcatenatingKeyValueMetadataMergeStrategy.java
new file mode 100644
index 0000000..e121928
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ConcatenatingKeyValueMetadataMergeStrategy.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ *  Strategy to concatenate if there are multiple values for a given key in 
metadata.
+ *  Note: use this with caution. This is expected to work only for certain use 
cases.
+ */
+public class ConcatenatingKeyValueMetadataMergeStrategy implements 
KeyValueMetadataMergeStrategy {
+  private static final String DEFAULT_DELIMITER = ",";
+  
+  private final String delimiter;
+
+  /**
+   * Default constructor.
+   */
+  public ConcatenatingKeyValueMetadataMergeStrategy() {
+    this.delimiter = DEFAULT_DELIMITER;
+  }
+
+  /**
+   * Constructor to use different delimiter for concatenation.
+   * 
+   * @param delim delimiter char sequence.
+   */
+  public ConcatenatingKeyValueMetadataMergeStrategy(String delim) {
+    this.delimiter = delim;
+  }
+  
+  /**
+   * @param keyValueMetaData the merged app specific metadata
+   */
+  public Map<String, String> merge(Map<String, Set<String>> keyValueMetaData) {
+    Map<String, String> mergedKeyValues = new HashMap<String, String>();
+    for (Map.Entry<String, Set<String>> entry : keyValueMetaData.entrySet()) {
+      mergedKeyValues.put(entry.getKey(), 
entry.getValue().stream().collect(Collectors.joining(this.delimiter)));
+    }
+    return mergedKeyValues;
+  }
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
index 311e323..ff2ef64 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,10 +21,8 @@ package org.apache.parquet.hadoop.metadata;
 import static java.util.Collections.unmodifiableMap;
 
 import java.io.Serializable;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.parquet.schema.MessageType;
@@ -87,19 +85,25 @@ public class GlobalMetaData implements Serializable {
    * Will merge the metadata as if it was coming from a single file.
    * (for all part files written together this will always work)
    * If there are conflicting values an exception will be thrown
+   * 
+   * Provided for backward compatibility
    * @return the merged version of this
    */
   public FileMetaData merge() {
+     return merge(new StrictKeyValueMetadataMergeStrategy());
+   }
+
+  /**
+   * Will merge the metadata as if it was coming from a single file.
+   * (for all part files written together this will always work)
+   * If there are conflicting values an exception will be thrown
+   * @return the merged version of this
+   */
+  public FileMetaData merge(KeyValueMetadataMergeStrategy 
keyValueMetadataMergeStrategy) {
     String createdByString = createdBy.size() == 1 ?
       createdBy.iterator().next() :
       createdBy.toString();
-    Map<String, String> mergedKeyValues = new HashMap<String, String>();
-    for (Entry<String, Set<String>> entry : keyValueMetaData.entrySet()) {
-      if (entry.getValue().size() > 1) {
-        throw new RuntimeException("could not merge metadata: key " + 
entry.getKey() + " has conflicting values: " + entry.getValue());
-      }
-      mergedKeyValues.put(entry.getKey(), entry.getValue().iterator().next());
-    }
+    Map<String, String> mergedKeyValues = 
keyValueMetadataMergeStrategy.merge(keyValueMetaData);
     return new FileMetaData(schema, mergedKeyValues, createdByString);
   }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/KeyValueMetadataMergeStrategy.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/KeyValueMetadataMergeStrategy.java
new file mode 100644
index 0000000..d9152aa
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/KeyValueMetadataMergeStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import org.apache.parquet.schema.MessageType;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+
+import static java.util.Collections.unmodifiableMap;
+
+/**
+ * Strategy to merge metadata. Possible implementations:
+ * 1) Expect metadata to be exactly identical. Otherwise throw error (this is 
the default strategy implementation)
+ * 2) Custom strategy to merge metadata if the values are not identical. 
Example: concatenate values.
+ */
+public interface KeyValueMetadataMergeStrategy extends Serializable {
+  /**
+   * @param keyValueMetaData the merged app specific metadata
+   */
+  Map<String, String> merge(Map<String, Set<String>> keyValueMetaData);
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/StrictKeyValueMetadataMergeStrategy.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/StrictKeyValueMetadataMergeStrategy.java
new file mode 100644
index 0000000..ca8e719
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/StrictKeyValueMetadataMergeStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ *  Strategy to throw errors if there are multiple values for a given key in 
metadata.
+ */
+public class StrictKeyValueMetadataMergeStrategy implements 
KeyValueMetadataMergeStrategy {
+  /**
+   * @param keyValueMetaData the merged app specific metadata
+   */
+  public Map<String, String> merge(Map<String, Set<String>> keyValueMetaData) {
+    Map<String, String> mergedKeyValues = new HashMap<String, String>();
+    for (Map.Entry<String, Set<String>> entry : keyValueMetaData.entrySet()) {
+      if (entry.getValue().size() > 1) {
+        throw new RuntimeException("could not merge metadata: key " + 
entry.getKey() + " has conflicting values: " + entry.getValue());
+      }
+      mergedKeyValues.put(entry.getKey(), entry.getValue().iterator().next());
+    }
+    return mergedKeyValues;
+  }
+}
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 9a8e6b4..2b78917 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -1008,6 +1008,63 @@ public class TestParquetFileWriter {
     }
   }
 
+  @Test
+  public void testMergeMetadataWithConflictingKeyValues() {
+    Map<String, String> keyValues1 = new HashMap<String, String>() {{
+      put("a", "b");
+    }};
+    Map<String, String> keyValues2 = new HashMap<String, String>() {{
+      put("a", "c");
+    }};
+    FileMetaData md1 = new FileMetaData(
+      new MessageType("root1",
+        new PrimitiveType(REPEATED, BINARY, "a"),
+        new PrimitiveType(OPTIONAL, BINARY, "b")),
+     keyValues1, "test");
+    FileMetaData md2 = new FileMetaData(
+      new MessageType("root1",
+        new PrimitiveType(REPEATED, BINARY, "a"),
+        new PrimitiveType(OPTIONAL, BINARY, "b")),
+      keyValues2, "test");
+    GlobalMetaData merged = ParquetFileWriter.mergeInto(md2, 
ParquetFileWriter.mergeInto(md1, null));
+    try {
+      merged.merge(new StrictKeyValueMetadataMergeStrategy());
+      fail("Merge metadata is expected to fail because of conflicting key 
values");
+    } catch (RuntimeException e) {
+      // expected because of conflicting values
+      assertTrue(e.getMessage().contains("could not merge metadata"));
+    }
+
+    Map<String, String> mergedKeyValues = merged.merge(new 
ConcatenatingKeyValueMetadataMergeStrategy()).getKeyValueMetaData();
+    assertEquals(1, mergedKeyValues.size());
+    String mergedValue = mergedKeyValues.get("a");
+    assertTrue(mergedValue.equals("b,c") || mergedValue.equals("c,b"));
+  }
+
+  @Test
+  public void testMergeMetadataWithNoConflictingKeyValues() {
+    Map<String, String> keyValues1 = new HashMap<String, String>() {{
+      put("a", "b");
+    }};
+    Map<String, String> keyValues2 = new HashMap<String, String>() {{
+      put("c", "d");
+    }};
+    FileMetaData md1 = new FileMetaData(
+      new MessageType("root1",
+        new PrimitiveType(REPEATED, BINARY, "a"),
+        new PrimitiveType(OPTIONAL, BINARY, "b")),
+      keyValues1, "test");
+    FileMetaData md2 = new FileMetaData(
+      new MessageType("root1",
+        new PrimitiveType(REPEATED, BINARY, "a"),
+        new PrimitiveType(OPTIONAL, BINARY, "b")),
+      keyValues2, "test");
+    GlobalMetaData merged = ParquetFileWriter.mergeInto(md2, 
ParquetFileWriter.mergeInto(md1, null));
+    Map<String, String> mergedValues = merged.merge(new 
StrictKeyValueMetadataMergeStrategy()).getKeyValueMetaData();
+    assertEquals("b", mergedValues.get("a"));
+    assertEquals("d", mergedValues.get("c"));
+  }
+
   private org.apache.parquet.column.statistics.Statistics<?> statsC1(Binary... 
values) {
     org.apache.parquet.column.statistics.Statistics<?> stats = 
org.apache.parquet.column.statistics.Statistics
         .createStats(C1.getPrimitiveType());
diff --git 
a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
 
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
index fe64587..395928c 100644
--- 
a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
+++ 
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
@@ -19,20 +19,28 @@
 package org.apache.parquet.tools.command;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.parquet.hadoop.ParquetFileWriter;
+import 
org.apache.parquet.hadoop.metadata.ConcatenatingKeyValueMetadataMergeStrategy;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy;
+import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
 import org.apache.parquet.tools.Main;
 
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class MergeCommand extends ArgsOnlyCommand {
   public static final String[] USAGE = new String[] {
@@ -41,6 +49,37 @@ public class MergeCommand extends ArgsOnlyCommand {
           "   <output> is the destination parquet file"
   };
 
+  private static enum MergeStrategy {
+    STRICT,
+    CONCAT,
+    CUSTOM,
+  }
+  
+  public static final Options OPTIONS;
+
+  static {
+    OPTIONS = new Options();
+    String availableStrategies = 
Arrays.stream(MergeStrategy.values()).map(Enum::name).collect(Collectors.joining(",
 "));
+    Option mergeStrategy = Option.builder("s")
+      .longOpt("mergeStrategy")
+      .desc("Strategy to merge (key, value) pairs in metadata if there are 
multiple values for same key. " +
+        "Available strategies: " + availableStrategies.toLowerCase() + ". 
(default: '" + MergeStrategy.STRICT.name().toLowerCase() + "')." +
+        " You can provide your custom implementation by specifying '" + 
MergeStrategy.CUSTOM.name().toLowerCase() + "' strategy.")
+      .optionalArg(true)
+      .build();
+    
+    Option mergeStrategyClass = Option.builder("c")
+      .longOpt("mergeStrategyClass")
+      .desc("Custom strategy class to merge (key, value) pairs in metadata if 
there are multiple values for same key." +
+        "Valid only with " + MergeStrategy.CUSTOM + " mergeStrategy. This can 
be useful if strategies provided by parquet library are not sufficient. " + 
+        "Requires fully qualified class name. Note that the class specified 
has to be included in the classpath.")
+      .optionalArg(true)
+      .build();
+
+    OPTIONS.addOption(mergeStrategy);
+    OPTIONS.addOption(mergeStrategyClass);
+  }
+
   /**
    * Biggest number of input files we can merge.
    */
@@ -56,6 +95,11 @@ public class MergeCommand extends ArgsOnlyCommand {
   }
 
   @Override
+  public Options getOptions() {
+    return OPTIONS;
+  }
+
+  @Override
   public String[] getUsageDescription() {
     return USAGE;
   }
@@ -76,7 +120,7 @@ public class MergeCommand extends ArgsOnlyCommand {
     Path outputFile = new Path(args.get(args.size() - 1));
 
     // Merge schema and extraMeta
-    FileMetaData mergedMeta = mergedMetadata(inputFiles);
+    FileMetaData mergedMeta = mergedMetadata(inputFiles, 
getMergeStrategy(options, conf));
     PrintWriter out = new PrintWriter(Main.out, true);
 
     // Merge data
@@ -103,8 +147,33 @@ public class MergeCommand extends ArgsOnlyCommand {
     writer.end(mergedMeta.getKeyValueMetaData());
   }
 
-  private FileMetaData mergedMetadata(List<Path> inputFiles) throws 
IOException {
-    return ParquetFileWriter.mergeMetadataFiles(inputFiles, 
conf).getFileMetaData();
+  private FileMetaData mergedMetadata(List<Path> inputFiles, 
KeyValueMetadataMergeStrategy keyValueMetadataMergeStrategy) throws IOException 
{
+    return ParquetFileWriter.mergeMetadataFiles(inputFiles, conf, 
keyValueMetadataMergeStrategy).getFileMetaData();
+  }
+
+  private KeyValueMetadataMergeStrategy getMergeStrategy(CommandLine options, 
Configuration conf) {
+    final MergeStrategy mergeStrategyOption;
+    if (options.hasOption('s')) {
+      mergeStrategyOption = 
MergeStrategy.valueOf(options.getOptionValue('s').toUpperCase());
+    } else {
+      mergeStrategyOption = MergeStrategy.STRICT;
+    }
+
+    switch(mergeStrategyOption) {
+      case STRICT:
+        return new StrictKeyValueMetadataMergeStrategy();
+      case CONCAT:
+        return new ConcatenatingKeyValueMetadataMergeStrategy();
+      case CUSTOM:
+        return loadCustomMergeStrategy(options.getOptionValue('c'));
+      default:
+        throw new IllegalArgumentException("Unknown merge strategy: " + 
mergeStrategyOption);
+    }
+  }
+
+  private KeyValueMetadataMergeStrategy loadCustomMergeStrategy(String 
mergeStrategyClass) {
+    Class<? extends KeyValueMetadataMergeStrategy> mergeStrategy = 
conf.getClass(mergeStrategyClass, null, KeyValueMetadataMergeStrategy.class);
+    return ReflectionUtils.newInstance(mergeStrategy, conf);
   }
 
   /**

Reply via email to