This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 0217061 PARQUET-1951: Allow merge strategies to combine key values
(#847)
0217061 is described below
commit 02170613d12b5122b07149136382a71505fa39d8
Author: satishkotha <[email protected]>
AuthorDate: Thu Jan 7 01:44:38 2021 -0800
PARQUET-1951: Allow merge strategies to combine key values (#847)
---
.../apache/parquet/hadoop/ParquetFileWriter.java | 39 ++++++++++-
...ConcatenatingKeyValueMetadataMergeStrategy.java | 61 +++++++++++++++++
.../parquet/hadoop/metadata/GlobalMetaData.java | 28 ++++----
.../metadata/KeyValueMetadataMergeStrategy.java | 42 ++++++++++++
.../StrictKeyValueMetadataMergeStrategy.java | 42 ++++++++++++
.../parquet/hadoop/TestParquetFileWriter.java | 57 ++++++++++++++++
.../apache/parquet/tools/command/MergeCommand.java | 79 ++++++++++++++++++++--
7 files changed, 329 insertions(+), 19 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 98699cf..c06e7a8 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -70,8 +70,10 @@ import
org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
+import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.hadoop.util.HadoopStreams;
@@ -1290,6 +1292,22 @@ public class ParquetFileWriter {
*/
@Deprecated
public static ParquetMetadata mergeMetadataFiles(List<Path> files,
Configuration conf) throws IOException {
+ return mergeMetadataFiles(files, conf, new
StrictKeyValueMetadataMergeStrategy());
+ }
+
+ /**
+ * Given a list of metadata files, merge them into a single ParquetMetadata
+ * Requires that the schemas be compatible, and the extraMetadata be exactly
equal.
+ * @param files a list of files to merge metadata from
+ * @param conf a configuration
+ * @param keyValueMetadataMergeStrategy strategy to merge values for same
key, if there are multiple
+ * @return merged parquet metadata for the files
+ * @throws IOException if there is an error while writing
+ * @deprecated metadata files are not recommended and will be removed in
2.0.0
+ */
+ @Deprecated
+ public static ParquetMetadata mergeMetadataFiles(List<Path> files,
Configuration conf,
+
KeyValueMetadataMergeStrategy keyValueMetadataMergeStrategy) throws IOException
{
Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list
of metadata");
GlobalMetaData globalMetaData = null;
@@ -1303,7 +1321,7 @@ public class ParquetFileWriter {
}
// collapse GlobalMetaData into a single FileMetaData, which will throw if
they are not compatible
- return new ParquetMetadata(globalMetaData.merge(), blocks);
+ return new
ParquetMetadata(globalMetaData.merge(keyValueMetadataMergeStrategy), blocks);
}
/**
@@ -1384,7 +1402,24 @@ public class ParquetFileWriter {
metadata.close();
}
+ /**
+ * Will merge the metadata of all the footers together
+ * @param root the directory containing all footers
+ * @param footers the list files footers to merge
+ * @return the global meta data for all the footers
+ */
static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
+ return mergeFooters(root, footers, new
StrictKeyValueMetadataMergeStrategy());
+ }
+
+ /**
+ * Will merge the metadata of all the footers together
+ * @param root the directory containing all footers
+ * @param footers the list files footers to merge
+ * @param keyValueMergeStrategy strategy to merge values for a given key (if
there are multiple values)
+ * @return the global meta data for all the footers
+ */
+ static ParquetMetadata mergeFooters(Path root, List<Footer> footers,
KeyValueMetadataMergeStrategy keyValueMergeStrategy) {
String rootPath = root.toUri().getPath();
GlobalMetaData fileMetaData = null;
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
@@ -1403,7 +1438,7 @@ public class ParquetFileWriter {
blocks.add(block);
}
}
- return new ParquetMetadata(fileMetaData.merge(), blocks);
+ return new ParquetMetadata(fileMetaData.merge(keyValueMergeStrategy),
blocks);
}
/**
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ConcatenatingKeyValueMetadataMergeStrategy.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ConcatenatingKeyValueMetadataMergeStrategy.java
new file mode 100644
index 0000000..e121928
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ConcatenatingKeyValueMetadataMergeStrategy.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Strategy to concatenate if there are multiple values for a given key in
metadata.
+ * Note: use this with caution. This is expected to work only for certain use
cases.
+ */
+public class ConcatenatingKeyValueMetadataMergeStrategy implements
KeyValueMetadataMergeStrategy {
+ private static final String DEFAULT_DELIMITER = ",";
+
+ private final String delimiter;
+
+ /**
+ * Default constructor.
+ */
+ public ConcatenatingKeyValueMetadataMergeStrategy() {
+ this.delimiter = DEFAULT_DELIMITER;
+ }
+
+ /**
+ * Constructor to use different delimiter for concatenation.
+ *
+ * @param delim delimiter char sequence.
+ */
+ public ConcatenatingKeyValueMetadataMergeStrategy(String delim) {
+ this.delimiter = delim;
+ }
+
+ /**
+ * @param keyValueMetaData the merged app specific metadata
+ */
+ public Map<String, String> merge(Map<String, Set<String>> keyValueMetaData) {
+ Map<String, String> mergedKeyValues = new HashMap<String, String>();
+ for (Map.Entry<String, Set<String>> entry : keyValueMetaData.entrySet()) {
+ mergedKeyValues.put(entry.getKey(),
entry.getValue().stream().collect(Collectors.joining(this.delimiter)));
+ }
+ return mergedKeyValues;
+ }
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
index 311e323..ff2ef64 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,10 +21,8 @@ package org.apache.parquet.hadoop.metadata;
import static java.util.Collections.unmodifiableMap;
import java.io.Serializable;
-import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import java.util.Map.Entry;
import java.util.Set;
import org.apache.parquet.schema.MessageType;
@@ -87,19 +85,25 @@ public class GlobalMetaData implements Serializable {
* Will merge the metadata as if it was coming from a single file.
* (for all part files written together this will always work)
* If there are conflicting values an exception will be thrown
+ *
+ * Provided for backward compatibility
* @return the merged version of this
*/
public FileMetaData merge() {
+ return merge(new StrictKeyValueMetadataMergeStrategy());
+ }
+
+ /**
+ * Will merge the metadata as if it was coming from a single file.
+ * (for all part files written together this will always work)
+ * If there are conflicting values an exception will be thrown
+ * @return the merged version of this
+ */
+ public FileMetaData merge(KeyValueMetadataMergeStrategy
keyValueMetadataMergeStrategy) {
String createdByString = createdBy.size() == 1 ?
createdBy.iterator().next() :
createdBy.toString();
- Map<String, String> mergedKeyValues = new HashMap<String, String>();
- for (Entry<String, Set<String>> entry : keyValueMetaData.entrySet()) {
- if (entry.getValue().size() > 1) {
- throw new RuntimeException("could not merge metadata: key " +
entry.getKey() + " has conflicting values: " + entry.getValue());
- }
- mergedKeyValues.put(entry.getKey(), entry.getValue().iterator().next());
- }
+ Map<String, String> mergedKeyValues =
keyValueMetadataMergeStrategy.merge(keyValueMetaData);
return new FileMetaData(schema, mergedKeyValues, createdByString);
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/KeyValueMetadataMergeStrategy.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/KeyValueMetadataMergeStrategy.java
new file mode 100644
index 0000000..d9152aa
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/KeyValueMetadataMergeStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import org.apache.parquet.schema.MessageType;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+
+import static java.util.Collections.unmodifiableMap;
+
+/**
+ * Strategy to merge metadata. Possible implementations:
+ * 1) Expect metadata to be exactly identical. Otherwise throw error (this is
the default strategy implementation)
+ * 2) Custom strategy to merge metadata if the values are not identical.
Example: concatenate values.
+ */
+public interface KeyValueMetadataMergeStrategy extends Serializable {
+ /**
+ * @param keyValueMetaData the merged app specific metadata
+ */
+ Map<String, String> merge(Map<String, Set<String>> keyValueMetaData);
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/StrictKeyValueMetadataMergeStrategy.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/StrictKeyValueMetadataMergeStrategy.java
new file mode 100644
index 0000000..ca8e719
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/StrictKeyValueMetadataMergeStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Strategy to throw errors if there are multiple values for a given key in
metadata.
+ */
+public class StrictKeyValueMetadataMergeStrategy implements
KeyValueMetadataMergeStrategy {
+ /**
+ * @param keyValueMetaData the merged app specific metadata
+ */
+ public Map<String, String> merge(Map<String, Set<String>> keyValueMetaData) {
+ Map<String, String> mergedKeyValues = new HashMap<String, String>();
+ for (Map.Entry<String, Set<String>> entry : keyValueMetaData.entrySet()) {
+ if (entry.getValue().size() > 1) {
+ throw new RuntimeException("could not merge metadata: key " +
entry.getKey() + " has conflicting values: " + entry.getValue());
+ }
+ mergedKeyValues.put(entry.getKey(), entry.getValue().iterator().next());
+ }
+ return mergedKeyValues;
+ }
+}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 9a8e6b4..2b78917 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -1008,6 +1008,63 @@ public class TestParquetFileWriter {
}
}
+ @Test
+ public void testMergeMetadataWithConflictingKeyValues() {
+ Map<String, String> keyValues1 = new HashMap<String, String>() {{
+ put("a", "b");
+ }};
+ Map<String, String> keyValues2 = new HashMap<String, String>() {{
+ put("a", "c");
+ }};
+ FileMetaData md1 = new FileMetaData(
+ new MessageType("root1",
+ new PrimitiveType(REPEATED, BINARY, "a"),
+ new PrimitiveType(OPTIONAL, BINARY, "b")),
+ keyValues1, "test");
+ FileMetaData md2 = new FileMetaData(
+ new MessageType("root1",
+ new PrimitiveType(REPEATED, BINARY, "a"),
+ new PrimitiveType(OPTIONAL, BINARY, "b")),
+ keyValues2, "test");
+ GlobalMetaData merged = ParquetFileWriter.mergeInto(md2,
ParquetFileWriter.mergeInto(md1, null));
+ try {
+ merged.merge(new StrictKeyValueMetadataMergeStrategy());
+ fail("Merge metadata is expected to fail because of conflicting key
values");
+ } catch (RuntimeException e) {
+ // expected because of conflicting values
+ assertTrue(e.getMessage().contains("could not merge metadata"));
+ }
+
+ Map<String, String> mergedKeyValues = merged.merge(new
ConcatenatingKeyValueMetadataMergeStrategy()).getKeyValueMetaData();
+ assertEquals(1, mergedKeyValues.size());
+ String mergedValue = mergedKeyValues.get("a");
+ assertTrue(mergedValue.equals("b,c") || mergedValue.equals("c,b"));
+ }
+
+ @Test
+ public void testMergeMetadataWithNoConflictingKeyValues() {
+ Map<String, String> keyValues1 = new HashMap<String, String>() {{
+ put("a", "b");
+ }};
+ Map<String, String> keyValues2 = new HashMap<String, String>() {{
+ put("c", "d");
+ }};
+ FileMetaData md1 = new FileMetaData(
+ new MessageType("root1",
+ new PrimitiveType(REPEATED, BINARY, "a"),
+ new PrimitiveType(OPTIONAL, BINARY, "b")),
+ keyValues1, "test");
+ FileMetaData md2 = new FileMetaData(
+ new MessageType("root1",
+ new PrimitiveType(REPEATED, BINARY, "a"),
+ new PrimitiveType(OPTIONAL, BINARY, "b")),
+ keyValues2, "test");
+ GlobalMetaData merged = ParquetFileWriter.mergeInto(md2,
ParquetFileWriter.mergeInto(md1, null));
+ Map<String, String> mergedValues = merged.merge(new
StrictKeyValueMetadataMergeStrategy()).getKeyValueMetaData();
+ assertEquals("b", mergedValues.get("a"));
+ assertEquals("d", mergedValues.get("c"));
+ }
+
private org.apache.parquet.column.statistics.Statistics<?> statsC1(Binary...
values) {
org.apache.parquet.column.statistics.Statistics<?> stats =
org.apache.parquet.column.statistics.Statistics
.createStats(C1.getPrimitiveType());
diff --git
a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
index fe64587..395928c 100644
---
a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
+++
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
@@ -19,20 +19,28 @@
package org.apache.parquet.tools.command;
import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.hadoop.ParquetFileWriter;
+import
org.apache.parquet.hadoop.metadata.ConcatenatingKeyValueMetadataMergeStrategy;
import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy;
+import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.tools.Main;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
public class MergeCommand extends ArgsOnlyCommand {
public static final String[] USAGE = new String[] {
@@ -41,6 +49,37 @@ public class MergeCommand extends ArgsOnlyCommand {
" <output> is the destination parquet file"
};
+ private static enum MergeStrategy {
+ STRICT,
+ CONCAT,
+ CUSTOM,
+ }
+
+ public static final Options OPTIONS;
+
+ static {
+ OPTIONS = new Options();
+ String availableStrategies =
Arrays.stream(MergeStrategy.values()).map(Enum::name).collect(Collectors.joining(",
"));
+ Option mergeStrategy = Option.builder("s")
+ .longOpt("mergeStrategy")
+ .desc("Strategy to merge (key, value) pairs in metadata if there are
multiple values for same key. " +
+ "Available strategies: " + availableStrategies.toLowerCase() + ".
(default: '" + MergeStrategy.STRICT.name().toLowerCase() + "')." +
+ " You can provide your custom implementation by specifying '" +
MergeStrategy.CUSTOM.name().toLowerCase() + "' strategy.")
+ .optionalArg(true)
+ .build();
+
+ Option mergeStrategyClass = Option.builder("c")
+ .longOpt("mergeStrategyClass")
+ .desc("Custom strategy class to merge (key, value) pairs in metadata if
there are multiple values for same key." +
+ "Valid only with " + MergeStrategy.CUSTOM + " mergeStrategy. This can
be useful if strategies provided by parquet library are not sufficient. " +
+ "Requires fully qualified class name. Note that the class specified
has to be included in the classpath.")
+ .optionalArg(true)
+ .build();
+
+ OPTIONS.addOption(mergeStrategy);
+ OPTIONS.addOption(mergeStrategyClass);
+ }
+
/**
* Biggest number of input files we can merge.
*/
@@ -56,6 +95,11 @@ public class MergeCommand extends ArgsOnlyCommand {
}
@Override
+ public Options getOptions() {
+ return OPTIONS;
+ }
+
+ @Override
public String[] getUsageDescription() {
return USAGE;
}
@@ -76,7 +120,7 @@ public class MergeCommand extends ArgsOnlyCommand {
Path outputFile = new Path(args.get(args.size() - 1));
// Merge schema and extraMeta
- FileMetaData mergedMeta = mergedMetadata(inputFiles);
+ FileMetaData mergedMeta = mergedMetadata(inputFiles,
getMergeStrategy(options, conf));
PrintWriter out = new PrintWriter(Main.out, true);
// Merge data
@@ -103,8 +147,33 @@ public class MergeCommand extends ArgsOnlyCommand {
writer.end(mergedMeta.getKeyValueMetaData());
}
- private FileMetaData mergedMetadata(List<Path> inputFiles) throws
IOException {
- return ParquetFileWriter.mergeMetadataFiles(inputFiles,
conf).getFileMetaData();
+ private FileMetaData mergedMetadata(List<Path> inputFiles,
KeyValueMetadataMergeStrategy keyValueMetadataMergeStrategy) throws IOException
{
+ return ParquetFileWriter.mergeMetadataFiles(inputFiles, conf,
keyValueMetadataMergeStrategy).getFileMetaData();
+ }
+
+ private KeyValueMetadataMergeStrategy getMergeStrategy(CommandLine options,
Configuration conf) {
+ final MergeStrategy mergeStrategyOption;
+ if (options.hasOption('s')) {
+ mergeStrategyOption =
MergeStrategy.valueOf(options.getOptionValue('s').toUpperCase());
+ } else {
+ mergeStrategyOption = MergeStrategy.STRICT;
+ }
+
+ switch(mergeStrategyOption) {
+ case STRICT:
+ return new StrictKeyValueMetadataMergeStrategy();
+ case CONCAT:
+ return new ConcatenatingKeyValueMetadataMergeStrategy();
+ case CUSTOM:
+ return loadCustomMergeStrategy(options.getOptionValue('c'));
+ default:
+ throw new IllegalArgumentException("Unknown merge strategy: " +
mergeStrategyOption);
+ }
+ }
+
+ private KeyValueMetadataMergeStrategy loadCustomMergeStrategy(String
mergeStrategyClass) {
+ Class<? extends KeyValueMetadataMergeStrategy> mergeStrategy =
conf.getClass(mergeStrategyClass, null, KeyValueMetadataMergeStrategy.class);
+ return ReflectionUtils.newInstance(mergeStrategy, conf);
}
/**