http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java new file mode 100644 index 0000000..675f4bf --- /dev/null +++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java @@ -0,0 +1,842 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AtomicDouble; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.Nullable; + +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.hadoop.WritableCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link HadoopInputFormatIO} is a Transform for reading data from any source which + * implements Hadoop {@link InputFormat}. For example- Cassandra, Elasticsearch, HBase, Redis, + * Postgres etc. {@link HadoopInputFormatIO} has to make several performance trade-offs in + * connecting to {@link InputFormat}, so if there is another Beam IO Transform specifically for + * connecting to your data source of choice, we would recommend using that one, but this IO + * Transform allows you to connect to many data sources that do not yet have a Beam IO Transform. + * + * <p>You will need to pass a Hadoop {@link Configuration} with parameters specifying how the read + * will occur. Many properties of the Configuration are optional, and some are required for certain + * {@link InputFormat} classes, but the following properties must be set for all InputFormats: + * <ul> + * <li>{@code mapreduce.job.inputformat.class}: The {@link InputFormat} class used to connect to + * your data source of choice.</li> + * <li>{@code key.class}: The key class returned by the {@link InputFormat} in + * {@code mapreduce.job.inputformat.class}.</li> + * <li>{@code value.class}: The value class returned by the {@link InputFormat} in + * {@code mapreduce.job.inputformat.class}.</li> + * </ul> + * For example: + * + * <pre> + * { + * Configuration myHadoopConfiguration = new Configuration(false); + * // Set Hadoop InputFormat, key and value class in configuration + * myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", + * MyDbInputFormatClass, InputFormat.class); + * myHadoopConfiguration.setClass("key.class", MyDbInputFormatKeyClass, Object.class); + * myHadoopConfiguration.setClass("value.class", + * MyDbInputFormatValueClass, Object.class); + * } + * </pre> + * + * <p>You will need to check to see if the key and value classes output by the {@link InputFormat} + * have a Beam {@link Coder} available. If not, you can use withKeyTranslation/withValueTranslation + * to specify a method transforming instances of those classes into another class that is supported + * by a Beam {@link Coder}. These settings are optional and you don't need to specify translation + * for both key and value. If you specify a translation, you will need to make sure the K or V of + * the read transform match the output type of the translation. + * + * <p>You will need to set appropriate InputFormat key and value class (i.e. "key.class" and + * "value.class") in Hadoop {@link Configuration}. If you set different InputFormat key or + * value class than InputFormat's actual key or value class then, it may result in an error like + * "unexpected extra bytes after decoding" while the decoding process of key/value object happens. + * Hence, it is important to set appropriate InputFormat key and value class. + * + * <h3>Reading using {@link HadoopInputFormatIO}</h3> + * + * <pre> + * {@code + * Pipeline p = ...; // Create pipeline. + * // Read data only with Hadoop configuration. + * p.apply("read", + * HadoopInputFormatIO.<InputFormatKeyClass, InputFormatKeyClass>read() + * .withConfiguration(myHadoopConfiguration); + * } + * // Read data with configuration and key translation (Example scenario: Beam Coder is not + * available for key class hence key translation is required.). + * SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType = + * new SimpleFunction<InputFormatKeyClass, MyKeyClass>() { + * public MyKeyClass apply(InputFormatKeyClass input) { + * // ...logic to transform InputFormatKeyClass to MyKeyClass + * } + * }; + * </pre> + * + * <pre> + * {@code + * p.apply("read", + * HadoopInputFormatIO.<MyKeyClass, InputFormatKeyClass>read() + * .withConfiguration(myHadoopConfiguration) + * .withKeyTranslation(myOutputKeyType); + * } + * </pre> + * + * <p>// Read data with configuration and value translation (Example scenario: Beam Coder is not + * available for value class hence value translation is required.). + * + * <pre> + * {@code + * SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType = + * new SimpleFunction<InputFormatValueClass, MyValueClass>() { + * public MyValueClass apply(InputFormatValueClass input) { + * // ...logic to transform InputFormatValueClass to MyValueClass + * } + * }; + * } + * </pre> + * + * <pre> + * {@code + * p.apply("read", + * HadoopInputFormatIO.<InputFormatKeyClass, MyValueClass>read() + * .withConfiguration(myHadoopConfiguration) + * .withValueTranslation(myOutputValueType); + * } + * </pre> + */ + +public class HadoopInputFormatIO { + private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatIO.class); + + /** + * Creates an uninitialized {@link HadoopInputFormatIO.Read}. Before use, the {@code Read} must + * be initialized with a HadoopInputFormatIO.Read#withConfiguration(HadoopConfiguration) that + * specifies the source. A key/value translation may also optionally be specified using + * {@link HadoopInputFormatIO.Read#withKeyTranslation}/ + * {@link HadoopInputFormatIO.Read#withValueTranslation}. + */ + public static <K, V> Read<K, V> read() { + return new AutoValue_HadoopInputFormatIO_Read.Builder<K, V>().build(); + } + + /** + * A {@link PTransform} that reads from any data source which implements Hadoop InputFormat. For + * e.g. Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. See the class-level Javadoc on + * {@link HadoopInputFormatIO} for more information. + * @param <K> Type of keys to be read. + * @param <V> Type of values to be read. + * @see HadoopInputFormatIO + */ + @AutoValue + public abstract static class Read<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { + + // Returns the Hadoop Configuration which contains specification of source. + @Nullable + public abstract SerializableConfiguration getConfiguration(); + + @Nullable public abstract SimpleFunction<?, K> getKeyTranslationFunction(); + @Nullable public abstract SimpleFunction<?, V> getValueTranslationFunction(); + @Nullable public abstract TypeDescriptor<K> getKeyTypeDescriptor(); + @Nullable public abstract TypeDescriptor<V> getValueTypeDescriptor(); + @Nullable public abstract TypeDescriptor<?> getinputFormatClass(); + @Nullable public abstract TypeDescriptor<?> getinputFormatKeyClass(); + @Nullable public abstract TypeDescriptor<?> getinputFormatValueClass(); + + abstract Builder<K, V> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<K, V> { + abstract Builder<K, V> setConfiguration(SerializableConfiguration configuration); + abstract Builder<K, V> setKeyTranslationFunction(SimpleFunction<?, K> function); + abstract Builder<K, V> setValueTranslationFunction(SimpleFunction<?, V> function); + abstract Builder<K, V> setKeyTypeDescriptor(TypeDescriptor<K> keyTypeDescriptor); + abstract Builder<K, V> setValueTypeDescriptor(TypeDescriptor<V> valueTypeDescriptor); + abstract Builder<K, V> setInputFormatClass(TypeDescriptor<?> inputFormatClass); + abstract Builder<K, V> setInputFormatKeyClass(TypeDescriptor<?> inputFormatKeyClass); + abstract Builder<K, V> setInputFormatValueClass(TypeDescriptor<?> inputFormatValueClass); + abstract Read<K, V> build(); + } + + /** + * Returns a new {@link HadoopInputFormatIO.Read} that will read from the source using the + * options provided by the given configuration. + * + * <p>Does not modify this object. + */ + public Read<K, V> withConfiguration(Configuration configuration) { + validateConfiguration(configuration); + TypeDescriptor<?> inputFormatClass = + TypeDescriptor.of(configuration.getClass("mapreduce.job.inputformat.class", null)); + TypeDescriptor<?> inputFormatKeyClass = + TypeDescriptor.of(configuration.getClass("key.class", null)); + TypeDescriptor<?> inputFormatValueClass = + TypeDescriptor.of(configuration.getClass("value.class", null)); + Builder<K, V> builder = + toBuilder().setConfiguration(new SerializableConfiguration(configuration)); + builder.setInputFormatClass(inputFormatClass); + builder.setInputFormatKeyClass(inputFormatKeyClass); + builder.setInputFormatValueClass(inputFormatValueClass); + /* + * Sets the output key class to InputFormat key class if withKeyTranslation() is not called + * yet. + */ + if (getKeyTranslationFunction() == null) { + builder.setKeyTypeDescriptor((TypeDescriptor<K>) inputFormatKeyClass); + } + /* + * Sets the output value class to InputFormat value class if withValueTranslation() is not + * called yet. + */ + if (getValueTranslationFunction() == null) { + builder.setValueTypeDescriptor((TypeDescriptor<V>) inputFormatValueClass); + } + return builder.build(); + } + + /** + * Returns a new {@link HadoopInputFormatIO.Read} that will transform the keys read from the + * source using the given key translation function. + * + * <p>Does not modify this object. + */ + public Read<K, V> withKeyTranslation(SimpleFunction<?, K> function) { + checkNotNull(function, "function"); + // Sets key class to key translation function's output class type. + return toBuilder().setKeyTranslationFunction(function) + .setKeyTypeDescriptor((TypeDescriptor<K>) function.getOutputTypeDescriptor()).build(); + } + + /** + * Returns a new {@link HadoopInputFormatIO.Read} that will transform the values read from the + * source using the given value translation function. + * + * <p>Does not modify this object. + */ + public Read<K, V> withValueTranslation(SimpleFunction<?, V> function) { + checkNotNull(function, "function"); + // Sets value class to value translation function's output class type. + return toBuilder().setValueTranslationFunction(function) + .setValueTypeDescriptor((TypeDescriptor<V>) function.getOutputTypeDescriptor()).build(); + } + + @Override + public PCollection<KV<K, V>> expand(PBegin input) { + // Get the key and value coders based on the key and value classes. + CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry(); + Coder<K> keyCoder = getDefaultCoder(getKeyTypeDescriptor(), coderRegistry); + Coder<V> valueCoder = getDefaultCoder(getValueTypeDescriptor(), coderRegistry); + HadoopInputFormatBoundedSource<K, V> source = new HadoopInputFormatBoundedSource<K, V>( + getConfiguration(), + keyCoder, + valueCoder, + getKeyTranslationFunction(), + getValueTranslationFunction()); + return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source)); + } + + /** + * Validates that the mandatory configuration properties such as InputFormat class, InputFormat + * key and value classes are provided in the Hadoop configuration. + */ + private void validateConfiguration(Configuration configuration) { + checkNotNull(configuration, "configuration"); + checkNotNull(configuration.get("mapreduce.job.inputformat.class"), + "configuration.get(\"mapreduce.job.inputformat.class\")"); + checkNotNull(configuration.get("key.class"), "configuration.get(\"key.class\")"); + checkNotNull(configuration.get("value.class"), + "configuration.get(\"value.class\")"); + } + + /** + * Validates inputs provided by the pipeline user before reading the data. + */ + @Override + public void validate(PBegin input) { + checkNotNull(getConfiguration(), "getConfiguration()"); + // Validate that the key translation input type must be same as key class of InputFormat. + validateTranslationFunction(getinputFormatKeyClass(), getKeyTranslationFunction(), + "Key translation's input type is not same as hadoop InputFormat : %s key class : %s"); + // Validate that the value translation input type must be same as value class of InputFormat. + validateTranslationFunction(getinputFormatValueClass(), getValueTranslationFunction(), + "Value translation's input type is not same as hadoop InputFormat : " + + "%s value class : %s"); + } + + /** + * Validates translation function given for key/value translation. + */ + private void validateTranslationFunction(TypeDescriptor<?> inputType, + SimpleFunction<?, ?> simpleFunction, String errorMsg) { + if (simpleFunction != null) { + if (!simpleFunction.getInputTypeDescriptor().equals(inputType)) { + throw new IllegalArgumentException( + String.format(errorMsg, getinputFormatClass().getRawType(), inputType.getRawType())); + } + } + } + + /** + * Returns the default coder for a given type descriptor. Coder Registry is queried for correct + * coder, if not found in Coder Registry, then check if the type descriptor provided is of type + * Writable, then WritableCoder is returned, else exception is thrown "Cannot find coder". + */ + public <T> Coder<T> getDefaultCoder(TypeDescriptor<?> typeDesc, CoderRegistry coderRegistry) { + Class classType = typeDesc.getRawType(); + try { + return (Coder<T>) coderRegistry.getCoder(typeDesc); + } catch (CannotProvideCoderException e) { + if (Writable.class.isAssignableFrom(classType)) { + return (Coder<T>) WritableCoder.of(classType); + } + throw new IllegalStateException(String.format("Cannot find coder for %s : ", typeDesc) + + e.getMessage(), e); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + if (getConfiguration().getHadoopConfiguration() != null) { + Iterator<Entry<String, String>> configProperties = getConfiguration() + .getHadoopConfiguration().iterator(); + while (configProperties.hasNext()) { + Entry<String, String> property = configProperties.next(); + builder.addIfNotNull(DisplayData.item(property.getKey(), property.getValue()) + .withLabel(property.getKey())); + } + } + } + } + + /** + * Bounded source implementation for {@link HadoopInputFormatIO}. + * @param <K> Type of keys to be read. + * @param <V> Type of values to be read. + */ + public static class HadoopInputFormatBoundedSource<K, V> extends BoundedSource<KV<K, V>> + implements Serializable { + private final SerializableConfiguration conf; + private final Coder<K> keyCoder; + private final Coder<V> valueCoder; + @Nullable private final SimpleFunction<?, K> keyTranslationFunction; + @Nullable private final SimpleFunction<?, V> valueTranslationFunction; + private final SerializableSplit inputSplit; + private transient List<SerializableSplit> inputSplits; + private long boundedSourceEstimatedSize = 0; + private transient InputFormat<?, ?> inputFormatObj; + private transient TaskAttemptContext taskAttemptContext; + private static final Set<Class<?>> immutableTypes = new HashSet<Class<?>>( + Arrays.asList( + String.class, + Byte.class, + Short.class, + Integer.class, + Long.class, + Float.class, + Double.class, + Boolean.class, + BigInteger.class, + BigDecimal.class)); + + HadoopInputFormatBoundedSource( + SerializableConfiguration conf, + Coder<K> keyCoder, + Coder<V> valueCoder, + @Nullable SimpleFunction<?, K> keyTranslationFunction, + @Nullable SimpleFunction<?, V> valueTranslationFunction) { + this(conf, + keyCoder, + valueCoder, + keyTranslationFunction, + valueTranslationFunction, + null); + } + + protected HadoopInputFormatBoundedSource( + SerializableConfiguration conf, + Coder<K> keyCoder, + Coder<V> valueCoder, + @Nullable SimpleFunction<?, K> keyTranslationFunction, + @Nullable SimpleFunction<?, V> valueTranslationFunction, + SerializableSplit inputSplit) { + this.conf = conf; + this.inputSplit = inputSplit; + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + this.keyTranslationFunction = keyTranslationFunction; + this.valueTranslationFunction = valueTranslationFunction; + } + + public SerializableConfiguration getConfiguration() { + return conf; + } + + @Override + public void validate() { + checkNotNull(conf, "conf"); + checkNotNull(keyCoder, "keyCoder"); + checkNotNull(valueCoder, "valueCoder"); + } + + @Override + public List<BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes, + PipelineOptions options) throws Exception { + // desiredBundleSizeBytes is not being considered as splitting based on this + // value is not supported by inputFormat getSplits() method. + if (inputSplit != null) { + LOG.info("Not splitting source {} because source is already split.", this); + return ImmutableList.of((BoundedSource<KV<K, V>>) this); + } + computeSplitsIfNecessary(); + LOG.info("Generated {} splits. Size of first split is {} ", inputSplits.size(), inputSplits + .get(0).getSplit().getLength()); + return Lists.transform(inputSplits, + new Function<SerializableSplit, BoundedSource<KV<K, V>>>() { + @Override + public BoundedSource<KV<K, V>> apply(SerializableSplit serializableInputSplit) { + HadoopInputFormatBoundedSource<K, V> hifBoundedSource = + new HadoopInputFormatBoundedSource<K, V>(conf, keyCoder, valueCoder, + keyTranslationFunction, valueTranslationFunction, serializableInputSplit); + return hifBoundedSource; + } + }); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions po) throws Exception { + if (inputSplit == null) { + // If there are no splits computed yet, then retrieve the splits. + computeSplitsIfNecessary(); + return boundedSourceEstimatedSize; + } + return inputSplit.getSplit().getLength(); + } + + /** + * This is a helper function to compute splits. This method will also calculate size of the + * data being read. Note: This method is executed exactly once and the splits are retrieved + * and cached in this. These splits are further used by splitIntoBundles() and + * getEstimatedSizeBytes(). + */ + @VisibleForTesting + void computeSplitsIfNecessary() throws IOException, InterruptedException { + if (inputSplits != null) { + return; + } + createInputFormatInstance(); + List<InputSplit> splits = + inputFormatObj.getSplits(Job.getInstance(conf.getHadoopConfiguration())); + if (splits == null) { + throw new IOException("Error in computing splits, getSplits() returns null."); + } + if (splits.isEmpty()) { + throw new IOException("Error in computing splits, getSplits() returns a empty list"); + } + boundedSourceEstimatedSize = 0; + inputSplits = new ArrayList<SerializableSplit>(); + for (InputSplit inputSplit : splits) { + if (inputSplit == null) { + throw new IOException("Error in computing splits, split is null in InputSplits list " + + "populated by getSplits() : "); + } + boundedSourceEstimatedSize += inputSplit.getLength(); + inputSplits.add(new SerializableSplit(inputSplit)); + } + } + + /** + * Creates instance of InputFormat class. The InputFormat class name is specified in the Hadoop + * configuration. + */ + protected void createInputFormatInstance() throws IOException { + if (inputFormatObj == null) { + try { + taskAttemptContext = + new TaskAttemptContextImpl(conf.getHadoopConfiguration(), new TaskAttemptID()); + inputFormatObj = + (InputFormat<?, ?>) conf + .getHadoopConfiguration() + .getClassByName( + conf.getHadoopConfiguration().get("mapreduce.job.inputformat.class")) + .newInstance(); + /* + * If InputFormat explicitly implements interface {@link Configurable}, then setConf() + * method of {@link Configurable} needs to be explicitly called to set all the + * configuration parameters. For example: InputFormat classes which implement Configurable + * are {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat DBInputFormat}, {@link + * org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat}, etc. + */ + if (Configurable.class.isAssignableFrom(inputFormatObj.getClass())) { + ((Configurable) inputFormatObj).setConf(conf.getHadoopConfiguration()); + } + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new IOException("Unable to create InputFormat object: ", e); + } + } + } + + @VisibleForTesting + InputFormat<?, ?> getInputFormat(){ + return inputFormatObj; + } + + @VisibleForTesting + void setInputFormatObj(InputFormat<?, ?> inputFormatObj) { + this.inputFormatObj = inputFormatObj; + } + + @Override + public Coder<KV<K, V>> getDefaultOutputCoder() { + return KvCoder.of(keyCoder, valueCoder); + } + + @Override + public BoundedReader<KV<K, V>> createReader(PipelineOptions options) throws IOException { + this.validate(); + if (inputSplit == null) { + throw new IOException("Cannot create reader as source is not split yet."); + } else { + createInputFormatInstance(); + return new HadoopInputFormatReader<>( + this, + keyTranslationFunction, + valueTranslationFunction, + inputSplit, + inputFormatObj, + taskAttemptContext); + } + } + + /** + * BoundedReader for Hadoop InputFormat source. + * + * @param <K> Type of keys RecordReader emits. + * @param <V> Type of values RecordReader emits. + */ + class HadoopInputFormatReader<T1, T2> extends BoundedSource.BoundedReader<KV<K, V>> { + + private final HadoopInputFormatBoundedSource<K, V> source; + @Nullable private final SimpleFunction<T1, K> keyTranslationFunction; + @Nullable private final SimpleFunction<T2, V> valueTranslationFunction; + private final SerializableSplit split; + private RecordReader<T1, T2> recordReader; + private volatile boolean doneReading = false; + private AtomicLong recordsReturned = new AtomicLong(); + // Tracks the progress of the RecordReader. + private AtomicDouble progressValue = new AtomicDouble(); + private transient InputFormat<T1, T2> inputFormatObj; + private transient TaskAttemptContext taskAttemptContext; + + private HadoopInputFormatReader(HadoopInputFormatBoundedSource<K, V> source, + @Nullable SimpleFunction keyTranslationFunction, + @Nullable SimpleFunction valueTranslationFunction, + SerializableSplit split, + InputFormat inputFormatObj, + TaskAttemptContext taskAttemptContext) { + this.source = source; + this.keyTranslationFunction = keyTranslationFunction; + this.valueTranslationFunction = valueTranslationFunction; + this.split = split; + this.inputFormatObj = inputFormatObj; + this.taskAttemptContext = taskAttemptContext; + } + + @Override + public HadoopInputFormatBoundedSource<K, V> getCurrentSource() { + return source; + } + + @Override + public boolean start() throws IOException { + try { + recordsReturned.set(0L); + recordReader = + (RecordReader<T1, T2>) inputFormatObj.createRecordReader(split.getSplit(), + taskAttemptContext); + if (recordReader != null) { + recordReader.initialize(split.getSplit(), taskAttemptContext); + progressValue.set(getProgress()); + if (recordReader.nextKeyValue()) { + recordsReturned.incrementAndGet(); + doneReading = false; + return true; + } + } else { + throw new IOException(String.format("Null RecordReader object returned by %s", + inputFormatObj.getClass())); + } + recordReader = null; + } catch (InterruptedException e) { + throw new IOException( + "Could not read because the thread got interrupted while " + + "reading the records with an exception: ", + e); + } + doneReading = true; + return false; + } + + @Override + public boolean advance() throws IOException { + try { + progressValue.set(getProgress()); + if (recordReader.nextKeyValue()) { + recordsReturned.incrementAndGet(); + return true; + } + doneReading = true; + } catch (InterruptedException e) { + throw new IOException("Unable to read data: ", e); + } + return false; + } + + @Override + public KV<K, V> getCurrent() { + K key = null; + V value = null; + try { + // Transform key if translation function is provided. + key = + transformKeyOrValue((T1) recordReader.getCurrentKey(), keyTranslationFunction, + keyCoder); + // Transform value if translation function is provided. + value = + transformKeyOrValue((T2) recordReader.getCurrentValue(), valueTranslationFunction, + valueCoder); + } catch (IOException | InterruptedException e) { + LOG.error("Unable to read data: " + "{}", e); + throw new IllegalStateException("Unable to read data: " + "{}", e); + } + return KV.of(key, value); + } + + /** + * Returns the serialized output of transformed key or value object. + * @throws ClassCastException + * @throws CoderException + */ + private <T, T3> T3 transformKeyOrValue(T input, + @Nullable SimpleFunction<T, T3> simpleFunction, Coder<T3> coder) throws CoderException, + ClassCastException { + T3 output; + if (null != simpleFunction) { + output = simpleFunction.apply(input); + } else { + output = (T3) input; + } + return cloneIfPossiblyMutable((T3) output, coder); + } + + /** + * Beam expects immutable objects, but the Hadoop InputFormats tend to re-use the same object + * when returning them. Hence, mutable objects returned by Hadoop InputFormats are cloned. + */ + private <T> T cloneIfPossiblyMutable(T input, Coder<T> coder) throws CoderException, + ClassCastException { + // If the input object is not of known immutable type, clone the object. + if (!isKnownImmutable(input)) { + input = CoderUtils.clone(coder, input); + } + return input; + } + + /** + * Utility method to check if the passed object is of a known immutable type. + */ + private boolean isKnownImmutable(Object o) { + return immutableTypes.contains(o.getClass()); + } + + @Override + public void close() throws IOException { + LOG.info("Closing reader after reading {} records.", recordsReturned); + if (recordReader != null) { + recordReader.close(); + recordReader = null; + } + } + + @Override + public Double getFractionConsumed() { + if (doneReading) { + return 1.0; + } else if (recordReader == null || recordsReturned.get() == 0L) { + return 0.0; + } + if (progressValue.get() == 0.0) { + return null; + } + return progressValue.doubleValue(); + } + + /** + * Returns RecordReader's progress. + * @throws IOException + * @throws InterruptedException + */ + private Double getProgress() throws IOException, InterruptedException { + try { + float progress = recordReader.getProgress(); + return (double) progress < 0 || progress > 1 ? 0.0 : progress; + } catch (IOException e) { + LOG.error( + "Error in computing the fractions consumed as RecordReader.getProgress() throws an " + + "exception : " + "{}", e); + throw new IOException( + "Error in computing the fractions consumed as RecordReader.getProgress() throws an " + + "exception : " + e.getMessage(), e); + } + } + + @Override + public final long getSplitPointsRemaining() { + if (doneReading) { + return 0; + } + /** + * This source does not currently support dynamic work rebalancing, so remaining parallelism + * is always 1. + */ + return 1; + } + } + } + + /** + * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit} to be serialized using + * Java's standard serialization mechanisms. + */ + public static class SerializableSplit implements Serializable { + + InputSplit inputSplit; + + public SerializableSplit() {} + + public SerializableSplit(InputSplit split) { + checkArgument(split instanceof Writable, + String.format("Split is not of type Writable: %s", split)); + this.inputSplit = split; + } + + public InputSplit getSplit() { + return inputSplit; + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + ObjectWritable ow = new ObjectWritable(); + ow.setConf(new Configuration(false)); + ow.readFields(in); + this.inputSplit = (InputSplit) ow.get(); + } + + private void writeObject(ObjectOutputStream out) throws IOException { + new ObjectWritable(inputSplit).write(out); + } + } + + /** + * A wrapper to allow Hadoop {@link org.apache.hadoop.conf.Configuration} to be serialized using + * Java's standard serialization mechanisms. Note that the org.apache.hadoop.conf.Configuration + * is Writable. + */ + public static class SerializableConfiguration implements Externalizable { + + private Configuration conf; + + public SerializableConfiguration() {} + + public SerializableConfiguration(Configuration conf) { + this.conf = conf; + } + + public Configuration getHadoopConfiguration() { + return conf; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(conf.getClass().getCanonicalName()); + ((Writable) conf).write(out); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + String className = in.readUTF(); + try { + conf = (Configuration) Class.forName(className).newInstance(); + conf.readFields(in); + } catch (InstantiationException | IllegalAccessException e) { + throw new IOException("Unable to create configuration: " + e); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/package-info.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/package-info.java new file mode 100644 index 0000000..5488448 --- /dev/null +++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines transforms for reading from Data sources which implement Hadoop Input Format. + * + * @see org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO + */ +package org.apache.beam.sdk.io.hadoop.inputformat; http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ConfigurableEmployeeInputFormat.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ConfigurableEmployeeInputFormat.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ConfigurableEmployeeInputFormat.java new file mode 100644 index 0000000..40f949b --- /dev/null +++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ConfigurableEmployeeInputFormat.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This is a dummy input format to test reading using HadoopInputFormatIO if InputFormat implements + * Configurable. This validates if setConf() method is called before getSplits(). Known InputFormats + * which implement Configurable are DBInputFormat, TableInputFormat etc. + */ +public class ConfigurableEmployeeInputFormat extends InputFormat<Text, Employee> implements + Configurable { + public boolean isConfSet = false; + + public ConfigurableEmployeeInputFormat() {} + + @Override + public Configuration getConf() { + return null; + } + + /** + * Set configuration properties such as number of splits and number of records in each split. + */ + @Override + public void setConf(Configuration conf) { + isConfSet = true; + } + + @Override + public RecordReader<Text, Employee> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new ConfigurableEmployeeRecordReader(); + } + + /** + * Returns InputSPlit list of {@link ConfigurableEmployeeInputSplit}. Throws exception if + * {@link #setConf()} is not called. + */ + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { + if (!isConfSet) { + throw new IOException("Configuration is not set."); + } + List<InputSplit> splits = new ArrayList<InputSplit>(); + splits.add(new ConfigurableEmployeeInputSplit()); + return splits; + } + + /** + * InputSplit implementation for ConfigurableEmployeeInputFormat. + */ + public class ConfigurableEmployeeInputSplit extends InputSplit implements Writable { + + @Override + public void readFields(DataInput arg0) throws IOException {} + + @Override + public void write(DataOutput arg0) throws IOException {} + + @Override + public long getLength() throws IOException, InterruptedException { + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return null; + } + } + + /** + * RecordReader for ConfigurableEmployeeInputFormat. + */ + public class ConfigurableEmployeeRecordReader extends RecordReader<Text, Employee> { + + @Override + public void initialize(InputSplit paramInputSplit, TaskAttemptContext paramTaskAttemptContext) + throws IOException, InterruptedException {} + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return false; + } + + @Override + public Text getCurrentKey() throws IOException, InterruptedException { + return null; + } + + @Override + public Employee getCurrentValue() throws IOException, InterruptedException { + return null; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void close() throws IOException {} + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/Employee.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/Employee.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/Employee.java new file mode 100644 index 0000000..9d4f293 --- /dev/null +++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/Employee.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat; + +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; + +/** + * This class is Employee POJO class with properties- employee name and address. Used in + * {@linkplain HadoopInputFormatIO} for different unit tests. + */ +@DefaultCoder(AvroCoder.class) +public class Employee { + private String empAddress; + private String empName; + + /** + * Empty constructor required for Avro decoding. + */ + public Employee() {} + + public Employee(String empName, String empAddress) { + this.empAddress = empAddress; + this.empName = empName; + } + + public String getEmpName() { + return empName; + } + + public void setEmpName(String empName) { + this.empName = empName; + } + + public String getEmpAddress() { + return empAddress; + } + + public void setEmpAddress(String empAddress) { + this.empAddress = empAddress; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Employee employeePojo = (Employee) o; + + if (empName != null ? !empName.equals(employeePojo.empName) : employeePojo.empName != null) { + return false; + } + if (empAddress != null ? !empAddress.equals(employeePojo.empAddress) + : employeePojo.empAddress != null) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public String toString() { + return "Employee{" + "Name='" + empName + '\'' + ", Address=" + empAddress + '}'; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/EmployeeInputFormat.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/EmployeeInputFormat.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/EmployeeInputFormat.java new file mode 100644 index 0000000..206f9ab --- /dev/null +++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/EmployeeInputFormat.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This is a valid InputFormat for reading employee data, available in the form of {@code List<KV>} + * as {@linkplain EmployeeRecordReader#employeeDataList employeeDataList} . + * {@linkplain EmployeeRecordReader#employeeDataList employeeDataList} is populated using + * {@linkplain TestEmployeeDataSet#populateEmployeeData()}. + * {@linkplain EmployeeInputFormat} is used to test whether the + * {@linkplain HadoopInputFormatIO } source returns immutable records in the scenario when + * RecordReader creates new key and value objects every time it reads data. + */ +public class EmployeeInputFormat extends InputFormat<Text, Employee> { + + public EmployeeInputFormat() {} + + @Override + public RecordReader<Text, Employee> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new EmployeeRecordReader(); + } + + @Override + public List<InputSplit> getSplits(JobContext arg0) throws IOException, InterruptedException { + List<InputSplit> inputSplitList = new ArrayList<InputSplit>(); + for (int i = 1; i <= TestEmployeeDataSet.NUMBER_OF_SPLITS; i++) { + InputSplit inputSplitObj = + new NewObjectsEmployeeInputSplit( + ((i - 1) * TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT), (i + * TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT - 1)); + inputSplitList.add(inputSplitObj); + } + return inputSplitList; + } + + /** + * InputSplit implementation for EmployeeInputFormat. + */ + public static class NewObjectsEmployeeInputSplit extends InputSplit implements Writable { + // Start and end map index of each split of employeeData. + private long startIndex; + private long endIndex; + + public NewObjectsEmployeeInputSplit() {} + + public NewObjectsEmployeeInputSplit(long startIndex, long endIndex) { + this.startIndex = startIndex; + this.endIndex = endIndex; + } + + /** + * Returns number of records in each split. + */ + @Override + public long getLength() throws IOException, InterruptedException { + return this.endIndex - this.startIndex + 1; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return null; + } + + public long getStartIndex() { + return startIndex; + } + + public long getEndIndex() { + return endIndex; + } + + @Override + public void readFields(DataInput dataIn) throws IOException { + startIndex = dataIn.readLong(); + endIndex = dataIn.readLong(); + } + + @Override + public void write(DataOutput dataOut) throws IOException { + dataOut.writeLong(startIndex); + dataOut.writeLong(endIndex); + } + } + + /** + * RecordReader for EmployeeInputFormat. + */ + public class EmployeeRecordReader extends RecordReader<Text, Employee> { + + private NewObjectsEmployeeInputSplit split; + private Text currentKey; + private Employee currentValue; + private long employeeListIndex = 0L; + private long recordsRead = 0L; + private List<KV<String, String>> employeeDataList; + + public EmployeeRecordReader() {} + + @Override + public void close() throws IOException {} + + @Override + public Text getCurrentKey() throws IOException, InterruptedException { + return currentKey; + } + + @Override + public Employee getCurrentValue() throws IOException, InterruptedException { + return currentValue; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return (float) recordsRead / split.getLength(); + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException, + InterruptedException { + this.split = (NewObjectsEmployeeInputSplit) split; + employeeListIndex = this.split.getStartIndex() - 1; + recordsRead = 0; + employeeDataList = TestEmployeeDataSet.populateEmployeeData(); + currentValue = new Employee(null, null); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if ((recordsRead++) >= split.getLength()) { + return false; + } + employeeListIndex++; + KV<String, String> employeeDetails = employeeDataList.get((int) employeeListIndex); + String empData[] = employeeDetails.getValue().split("_"); + /* + * New objects must be returned every time for key and value in order to test the scenario as + * discussed the in the class' javadoc. + */ + currentKey = new Text(employeeDetails.getKey()); + currentValue = new Employee(empData[0], empData[1]); + return true; + } + } +}
