Failure in reading a record should not cause a failure to read the entire
file.

On Tue, Mar 29, 2016 at 10:18 AM, chinmaykolhatkar <[email protected]>
wrote:

> Github user chinmaykolhatkar commented on a diff in the pull request:
>
>
> https://github.com/apache/incubator-apex-malhar/pull/211#discussion_r57762245
>
>     --- Diff:
> contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
> ---
>     @@ -0,0 +1,159 @@
>     +/**
>     + * Licensed to the Apache Software Foundation (ASF) under one
>     + * or more contributor license agreements.  See the NOTICE file
>     + * distributed with this work for additional information
>     + * regarding copyright ownership.  The ASF licenses this file
>     + * to you under the Apache License, Version 2.0 (the
>     + * "License"); you may not use this file except in compliance
>     + * with the License.  You may obtain a copy of the License at
>     + *
>     + *   http://www.apache.org/licenses/LICENSE-2.0
>     + *
>     + * Unless required by applicable law or agreed to in writing,
>     + * software distributed under the License is distributed on an
>     + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>     + * KIND, either express or implied.  See the License for the
>     + * specific language governing permissions and limitations
>     + * under the License.
>     + */
>     +package com.datatorrent.contrib.avro;
>     +
>     +import java.io.IOException;
>     +import java.io.InputStream;
>     +
>     +import org.slf4j.Logger;
>     +import org.slf4j.LoggerFactory;
>     +
>     +import org.apache.avro.AvroRuntimeException;
>     +import org.apache.avro.file.DataFileStream;
>     +import org.apache.avro.generic.GenericDatumReader;
>     +import org.apache.avro.generic.GenericRecord;
>     +import org.apache.avro.io.DatumReader;
>     +import org.apache.hadoop.classification.InterfaceStability;
>     +import org.apache.hadoop.fs.Path;
>     +
>     +import com.datatorrent.api.AutoMetric;
>     +import com.datatorrent.api.DefaultOutputPort;
>     +import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
>     +import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
>     +
>     +/**
>     + * <p>
>     + * Avro File Input Operator
>     + * </p>
>     + * A specific implementation of the AbstractFileInputOperator to read
> Avro
>     + * container files.<br>
>     + * No need to provide schema,its inferred from the file<br>
>     + * Users can add the {@link
> IdempotentStorageManager.FSIdempotentStorageManager}
>     + * to ensure exactly once semantics with a HDFS backed WAL.
>     + *
>     + * @displayName AvroFileInputOperator
>     + * @category Input
>     + * @tags fs, file,avro, input operator
>     + * @since 3.3.0
>     + */
>     [email protected]
>     +public class AvroFileInputOperator extends
> AbstractFileInputOperator<GenericRecord>
>     +{
>     +
>     +  private transient long offset = 0L;
>     +
>     +  @AutoMetric
>     +  int recordCnt = 0;
>     +
>     +  @AutoMetric
>     +  int errorCnt = 0;
>     +
>     +  private transient DataFileStream<GenericRecord> avroDataStream;
>     +  private transient GenericRecord record = null;
>     +
>     +  public final transient DefaultOutputPort<GenericRecord> output =
> new DefaultOutputPort<GenericRecord>();
>     +
>     +  @OutputPortFieldAnnotation(optional = true)
>     +  public final transient DefaultOutputPort<String> completedFilesPort
> = new DefaultOutputPort<String>();
>     +
>     +  @OutputPortFieldAnnotation(optional = true)
>     +  public final transient DefaultOutputPort<String> errorRecordsPort =
> new DefaultOutputPort<String>();
>     +
>     +  @Override
>     +  protected InputStream openFile(Path path) throws IOException
>     +  {
>     +    InputStream is = super.openFile(path);
>     +    if (is != null) {
>     +      try {
>     +        DatumReader<GenericRecord> datumReader = new
> GenericDatumReader<GenericRecord>();
>     +        avroDataStream = new DataFileStream<GenericRecord>(is,
> datumReader);
>     +        datumReader.setSchema(avroDataStream.getSchema());
>     +        return is;
>     +      } catch (NullPointerException npe) {
>     +        LOG.error("Schemaless file - " + npe.getMessage());
>     +      }
>     +    }
>     +    return null;
>     --- End diff --
>
>     What happens when this method returns null?
>     You take care of following cases:
>     1. How is null handled?
>     2. Is there a point in returning null or probably just throw this as a
> RuntimeException if the situation is not recoverable?
>
>
> ---
> If your project is set up for it, you can reply to this email and have your
> reply appear on GitHub as well. If your project does not have this feature
> enabled and wishes so, or if the feature is enabled but not working, please
> contact infrastructure at [email protected] or file a JIRA ticket
> with INFRA.
> ---
>

Reply via email to