[ https://issues.apache.org/jira/browse/APEXMALHAR-2011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15216366#comment-15216366 ]
ASF GitHub Bot commented on APEXMALHAR-2011: -------------------------------------------- Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/211#discussion_r57762011 --- Diff: contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java --- @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.avro; + +import java.io.IOException; +import java.io.InputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator; + +/** + * <p> + * Avro File Input Operator + * </p> + * A specific implementation of the AbstractFileInputOperator to read Avro + * container files.<br> + * No need to provide schema,its inferred from the file<br> + * Users can add the {@link IdempotentStorageManager.FSIdempotentStorageManager} + * to ensure exactly once semantics with a HDFS backed WAL. + * + * @displayName AvroFileInputOperator + * @category Input + * @tags fs, file,avro, input operator + * @since 3.3.0 + */ +@InterfaceStability.Evolving +public class AvroFileInputOperator extends AbstractFileInputOperator<GenericRecord> +{ + + private transient long offset = 0L; + + @AutoMetric + int recordCnt = 0; + + @AutoMetric + int errorCnt = 0; + + private transient DataFileStream<GenericRecord> avroDataStream; + private transient GenericRecord record = null; + + public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<GenericRecord>(); + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<String> completedFilesPort = new DefaultOutputPort<String>(); + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<String> errorRecordsPort = new DefaultOutputPort<String>(); + + @Override --- End diff -- Can you please add the method javadoc for openFile and describe what it does? > POJO to Avro record converter > ----------------------------- > > Key: APEXMALHAR-2011 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2011 > Project: Apache Apex Malhar > Issue Type: New Feature > Reporter: devendra tagare > > We are looking to develop a record converter which would take a POJO as an > input and emit a Generic record as the output based on the given Avro schema. > The expected inputs for this operator would be, > 1.Class Name of the incoming POJO > 2.Avro schema for the Generic Record to emit. > This operator would receive an Object on its input port and emit a Generic > record on the output port. > To start with, we would handle primitive types and then go on to handle > complex types. > Thanks, > Dev -- This message was sent by Atlassian JIRA (v6.3.4#6332)