Failure in reading a record should not cause a failure to read the entire file.
On Tue, Mar 29, 2016 at 10:18 AM, chinmaykolhatkar <[email protected]> wrote: > Github user chinmaykolhatkar commented on a diff in the pull request: > > > https://github.com/apache/incubator-apex-malhar/pull/211#discussion_r57762245 > > --- Diff: > contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java > --- > @@ -0,0 +1,159 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package com.datatorrent.contrib.avro; > + > +import java.io.IOException; > +import java.io.InputStream; > + > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > + > +import org.apache.avro.AvroRuntimeException; > +import org.apache.avro.file.DataFileStream; > +import org.apache.avro.generic.GenericDatumReader; > +import org.apache.avro.generic.GenericRecord; > +import org.apache.avro.io.DatumReader; > +import org.apache.hadoop.classification.InterfaceStability; > +import org.apache.hadoop.fs.Path; > + > +import com.datatorrent.api.AutoMetric; > +import com.datatorrent.api.DefaultOutputPort; > +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; > +import com.datatorrent.lib.io.fs.AbstractFileInputOperator; > + > +/** > + * <p> > + * Avro File Input Operator > + * </p> > + * A specific implementation of the AbstractFileInputOperator to read > Avro > + * container files.<br> > + * No need to provide schema,its inferred from the file<br> > + * Users can add the {@link > IdempotentStorageManager.FSIdempotentStorageManager} > + * to ensure exactly once semantics with a HDFS backed WAL. > + * > + * @displayName AvroFileInputOperator > + * @category Input > + * @tags fs, file,avro, input operator > + * @since 3.3.0 > + */ > [email protected] > +public class AvroFileInputOperator extends > AbstractFileInputOperator<GenericRecord> > +{ > + > + private transient long offset = 0L; > + > + @AutoMetric > + int recordCnt = 0; > + > + @AutoMetric > + int errorCnt = 0; > + > + private transient DataFileStream<GenericRecord> avroDataStream; > + private transient GenericRecord record = null; > + > + public final transient DefaultOutputPort<GenericRecord> output = > new DefaultOutputPort<GenericRecord>(); > + > + @OutputPortFieldAnnotation(optional = true) > + public final transient DefaultOutputPort<String> completedFilesPort > = new DefaultOutputPort<String>(); > + > + @OutputPortFieldAnnotation(optional = true) > + public final transient DefaultOutputPort<String> errorRecordsPort = > new DefaultOutputPort<String>(); > + > + @Override > + protected InputStream openFile(Path path) throws IOException > + { > + InputStream is = super.openFile(path); > + if (is != null) { > + try { > + DatumReader<GenericRecord> datumReader = new > GenericDatumReader<GenericRecord>(); > + avroDataStream = new DataFileStream<GenericRecord>(is, > datumReader); > + datumReader.setSchema(avroDataStream.getSchema()); > + return is; > + } catch (NullPointerException npe) { > + LOG.error("Schemaless file - " + npe.getMessage()); > + } > + } > + return null; > --- End diff -- > > What happens when this method returns null? > You take care of following cases: > 1. How is null handled? > 2. Is there a point in returning null or probably just throw this as a > RuntimeException if the situation is not recoverable? > > > --- > If your project is set up for it, you can reply to this email and have your > reply appear on GitHub as well. If your project does not have this feature > enabled and wishes so, or if the feature is enabled but not working, please > contact infrastructure at [email protected] or file a JIRA ticket > with INFRA. > --- >
