Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 37f813092 -> 802240045
APEXMALHAR-2011-2012 read & convert Avro to Pojo Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5075ce0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5075ce0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5075ce0e Branch: refs/heads/devel-3 Commit: 5075ce0ef75afccdff2edf4c044465340176a148 Parents: a23cc5b Author: Devendra Tagare <[email protected]> Authored: Tue Mar 15 20:09:06 2016 +0530 Committer: devtagare <[email protected]> Committed: Thu Mar 31 12:45:14 2016 -0700 ---------------------------------------------------------------------- .../contrib/avro/AvroFileInputOperator.java | 168 +++++++ .../contrib/avro/AvroRecordHelper.java | 123 +++++ .../datatorrent/contrib/avro/AvroToPojo.java | 411 +++++++++++++++++ .../datatorrent/contrib/avro/PojoToAvro.java | 273 +++++++++++ .../contrib/avro/AvroFileInputOperatorTest.java | 448 +++++++++++++++++++ .../contrib/avro/AvroToPojoTest.java | 325 ++++++++++++++ .../contrib/avro/PojoToAvroTest.java | 236 ++++++++++ 7 files changed, 1984 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java new file mode 100644 index 0000000..14dfdf2 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.avro; + +import java.io.IOException; +import java.io.InputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; + +import com.google.common.annotations.VisibleForTesting; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator; + +/** + * <p> + * Avro File Input Operator + * </p> + * A specific implementation of the AbstractFileInputOperator to read Avro + * container files.<br> + * No need to provide schema,its inferred from the file<br> + * This operator emits a GenericRecord based on the schema derived from the + * input file<br> + * Users can add the {@link IdempotentStorageManager.FSIdempotentStorageManager} + * to ensure exactly once semantics with a HDFS backed WAL. + * + * @displayName AvroFileInputOperator + * @category Input + * @tags fs, file,avro, input operator + */ [email protected] +public class AvroFileInputOperator extends AbstractFileInputOperator<GenericRecord> +{ + + private transient long offset = 0L; + + @AutoMetric + @VisibleForTesting + int recordCount = 0; + + @AutoMetric + @VisibleForTesting + int errorCount = 0; + + private transient DataFileStream<GenericRecord> avroDataStream; + + public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<GenericRecord>(); + + public final transient DefaultOutputPort<String> completedFilesPort = new DefaultOutputPort<String>(); + + public final transient DefaultOutputPort<String> errorRecordsPort = new DefaultOutputPort<String>(); + + /** + * Returns a input stream given a file path + * + * @param path + * @return InputStream + * @throws IOException + */ + @Override + protected InputStream openFile(Path path) throws IOException + { + InputStream is = super.openFile(path); + if (is != null) { + DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(); + avroDataStream = new DataFileStream<GenericRecord>(is, datumReader); + datumReader.setSchema(avroDataStream.getSchema()); + } + return is; + } + + /** + * Reads a GenericRecord from the given input stream<br> + * Emits the FileName,Offset,Exception on the error port if its connected + * + * @return GenericRecord + */ + @Override + protected GenericRecord readEntity() throws IOException + { + GenericRecord record = null; + + record = null; + + try { + if (avroDataStream != null && avroDataStream.hasNext()) { + offset++; + record = avroDataStream.next(); + recordCount++; + return record; + } + } catch (AvroRuntimeException are) { + LOG.error("Exception in parsing record for file - " + super.currentFile + " at offset - " + offset, are); + if (errorRecordsPort.isConnected()) { + errorRecordsPort.emit("FileName:" + super.currentFile + ", Offset:" + offset); + } + errorCount++; + throw new AvroRuntimeException(are); + } + return record; + } + + /** + * Closes the input stream If the completed files port is connected, the + * completed file is emitted from this port + */ + @Override + protected void closeFile(InputStream is) throws IOException + { + String fileName = super.currentFile; + + if (avroDataStream != null) { + avroDataStream.close(); + } + + super.closeFile(is); + + if (completedFilesPort.isConnected()) { + completedFilesPort.emit(fileName); + } + offset = 0; + } + + @Override + protected void emit(GenericRecord tuple) + { + if (tuple != null) { + output.emit(tuple); + } + } + + @Override + public void beginWindow(long windowId) + { + errorCount = 0; + recordCount = 0; + } + + private static final Logger LOG = LoggerFactory.getLogger(AvroFileInputOperator.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/main/java/com/datatorrent/contrib/avro/AvroRecordHelper.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroRecordHelper.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroRecordHelper.java new file mode 100644 index 0000000..8f29b86 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroRecordHelper.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.avro; + +import java.text.ParseException; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; + +/** + * This is an utility class for reading Avro converted records.<br> + * This class can be used with the {@link PojoToAvro} or in isolation to get Avro values. + */ +public class AvroRecordHelper +{ + + /** + * Convert a passed String value to the given type for the key as per Schema + */ + public static Object convertValueStringToAvroKeyType(Schema schema, String key, String value) throws ParseException + { + Type type = null; + + if (schema.getField(key) != null) { + type = schema.getField(key).schema().getType(); + } else { + return value; + } + + Object convertedValue = null; + + if (type == Type.UNION) { + convertedValue = convertAndResolveUnionToPrimitive(schema, key, value); + } else { + convertedValue = convertValueToAvroPrimitive(type, key, value); + } + + return convertedValue; + + } + + private static Object convertValueToAvroPrimitive(Type type, String key, String value) throws ParseException + { + Object newValue = value; + switch (type) { + case BOOLEAN: + newValue = Boolean.parseBoolean(value); + break; + case DOUBLE: + newValue = Double.parseDouble(value); + break; + case FLOAT: + newValue = Float.parseFloat(value); + break; + case INT: + newValue = Integer.parseInt(value); + break; + case LONG: + newValue = Long.parseLong(value); + break; + case BYTES: + newValue = value.getBytes(); + break; + case STRING: + newValue = value; + break; + case NULL: + newValue = null; + break; + default: + newValue = value; + } + return newValue; + } + + private static Object convertAndResolveUnionToPrimitive(Schema schema, String key, String value) throws ParseException + { + Schema unionSchema = schema.getField(key).schema(); + List<Schema> types = unionSchema.getTypes(); + Object convertedValue = null; + for (int i = 0; i < types.size(); i++) { + try { + if (types.get(i).getType() == Type.NULL) { + if (value == null || value.equals("null")) { + convertedValue = null; + break; + } else { + continue; + } + } + convertedValue = convertValueToAvroPrimitive(types.get(i).getType(), key, value); + } catch (RuntimeException e) { + LOG.error("Could not handle schema resolution", e); + continue; + } + break; + } + + return convertedValue; + } + + private static final Logger LOG = LoggerFactory.getLogger(AvroRecordHelper.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java new file mode 100644 index 0000000..7fa0936 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java @@ -0,0 +1,411 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.avro; + +import java.util.ArrayList; +import java.util.List; +import java.util.StringTokenizer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.ClassUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; +import com.datatorrent.lib.util.PojoUtils; + +/** + * <p> + * AvroToPojo + * </p> + * A generic implementation for conversion from Avro to POJO. The POJO class + * name & field mapping should be provided by the user.<br> + * If this mapping is not provided then reflection is used to determine this + * mapping.<br> + * As of now only primitive types are supported.<br> + * Error records are emitted on the errorPort if connected + * + * @displayName Avro To Pojo + * @category Converter + * @tags avro + */ [email protected] +public class AvroToPojo extends BaseOperator +{ + + private transient Class<?> pojoClass; + + private static final String FIELD_SEPARATOR = ":"; + private static final String RECORD_SEPARATOR = ","; + + private String genericRecordToPOJOFieldsMapping = null; + + private List<FieldInfo> fieldInfos; + + private List<ActiveFieldInfo> columnFieldSetters; + + @AutoMetric + @VisibleForTesting + int recordCount = 0; + + @AutoMetric + @VisibleForTesting + int errorCount = 0; + + @AutoMetric + @VisibleForTesting + int fieldErrorCount = 0; + + public final transient DefaultOutputPort<GenericRecord> errorPort = new DefaultOutputPort<GenericRecord>(); + + /** + * Returns a string representing mapping from generic record to POJO fields + */ + public String getGenericRecordToPOJOFieldsMapping() + { + return genericRecordToPOJOFieldsMapping; + } + + /** + * Comma separated list mapping a field in Avro schema to POJO field eg : + * orderId:orderId:INTEGER,total:total:DOUBLE + */ + public void setGenericRecordToPOJOFieldsMapping(String genericRecordToPOJOFieldsMapping) + { + this.genericRecordToPOJOFieldsMapping = genericRecordToPOJOFieldsMapping; + } + + public final transient DefaultInputPort<GenericRecord> data = new DefaultInputPort<GenericRecord>() + { + @Override + public void process(GenericRecord tuple) + { + processTuple(tuple); + } + }; + + /** + * Converts given Generic Record and to a POJO and emits it + */ + protected void processTuple(GenericRecord tuple) + { + try { + Object obj = getPOJOFromGenericRecord(tuple); + + if (obj != null) { + output.emit(obj); + recordCount++; + } else if (errorPort.isConnected()) { + errorPort.emit(tuple); + errorCount++; + } + + } catch (InstantiationException | IllegalAccessException e) { + LOG.error("Could not initialize object of class -" + getClass().getName(), e); + errorCount++; + } + } + + /** + * Returns a POJO from a Generic Record + * + * @return Object + */ + @SuppressWarnings("unchecked") + private Object getPOJOFromGenericRecord(GenericRecord tuple) throws InstantiationException, IllegalAccessException + { + Object newObj = getPojoClass().newInstance(); + + try { + for (int i = 0; i < columnFieldSetters.size(); i++) { + + AvroToPojo.ActiveFieldInfo afi = columnFieldSetters.get(i); + SupportType st = afi.fieldInfo.getType(); + Object val = null; + + try { + val = tuple.get(afi.fieldInfo.getColumnName()); + } catch (Exception e) { + LOG.error("Could not find field -" + afi.fieldInfo.getColumnName() + "- in the generic record", e); + val = null; + fieldErrorCount++; + } + + if (val == null) { + continue; + } + + try { + switch (st) { + case BOOLEAN: + ((PojoUtils.SetterBoolean<Object>)afi.setterOrGetter).set(newObj, + (boolean)tuple.get(afi.fieldInfo.getColumnName())); + break; + + case DOUBLE: + ((PojoUtils.SetterDouble<Object>)afi.setterOrGetter).set(newObj, + (double)tuple.get(afi.fieldInfo.getColumnName())); + break; + + case FLOAT: + ((PojoUtils.SetterFloat<Object>)afi.setterOrGetter).set(newObj, + (float)tuple.get(afi.fieldInfo.getColumnName())); + break; + + case INTEGER: + ((PojoUtils.SetterInt<Object>)afi.setterOrGetter).set(newObj, + (int)tuple.get(afi.fieldInfo.getColumnName())); + break; + + case STRING: + ((PojoUtils.Setter<Object, String>)afi.setterOrGetter).set(newObj, + new String(tuple.get(afi.fieldInfo.getColumnName()).toString())); + break; + + case LONG: + ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(newObj, + (long)tuple.get(afi.fieldInfo.getColumnName())); + break; + + default: + throw new AvroRuntimeException("Invalid Support Type"); + + } + } catch (AvroRuntimeException e) { + LOG.error("Exception in setting value", e); + fieldErrorCount++; + } + + } + } catch (Exception ex) { + LOG.error("Generic Exception in setting value" + ex.getMessage()); + errorCount++; + newObj = null; + } + return newObj; + } + + /** + * Use reflection to generate field info values if the user has not provided + * the inputs mapping + * + * @return String representing the POJO field to Avro field mapping + */ + private String generateFieldInfoInputs(Class<?> cls) + { + java.lang.reflect.Field[] fields = cls.getDeclaredFields(); + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < fields.length; i++) { + java.lang.reflect.Field f = fields[i]; + Class<?> c = ClassUtils.primitiveToWrapper(f.getType()); + sb.append(f.getName()).append(FIELD_SEPARATOR).append(f.getName()).append(FIELD_SEPARATOR) + .append(c.getSimpleName().toUpperCase()).append(RECORD_SEPARATOR); + } + return sb.substring(0, sb.length() - 1); + } + + /** + * Creates a map representing fieldName in POJO:field in Generic Record:Data + * type + * + * @return List of FieldInfo + */ + private List<FieldInfo> createFieldInfoMap(String str) + { + fieldInfos = new ArrayList<FieldInfo>(); + StringTokenizer strtok = new StringTokenizer(str, RECORD_SEPARATOR); + + while (strtok.hasMoreTokens()) { + String[] token = strtok.nextToken().split(FIELD_SEPARATOR); + try { + fieldInfos.add(new FieldInfo(token[0], token[1], SupportType.valueOf(token[2]))); + } catch (Exception e) { + LOG.error("Invalid support type", e); + } + } + return fieldInfos; + } + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>() + { + public void setup(PortContext context) + { + setPojoClass(context.getValue(Context.PortContext.TUPLE_CLASS)); + + columnFieldSetters = Lists.newArrayList(); + + /** + * Check if the mapping of Generic record fields to POJO is given, else + * use reflection + */ + if (getGenericRecordToPOJOFieldsMapping() == null) { + setFieldInfos(createFieldInfoMap(generateFieldInfoInputs(getPojoClass()))); + } else { + setFieldInfos(createFieldInfoMap(getGenericRecordToPOJOFieldsMapping())); + } + + initColumnFieldSetters(getFieldInfos()); + initializeActiveFieldSetters(); + } + }; + + @Override + public void endWindow() + { + errorCount = 0; + fieldErrorCount = 0; + recordCount = 0; + + } + + private Class<?> getPojoClass() + { + return pojoClass; + } + + public void setPojoClass(Class<?> pojoClass) + { + this.pojoClass = pojoClass; + } + + /** + * Class that maps fieldInfo to its getters or setters + */ + protected static class ActiveFieldInfo + { + final FieldInfo fieldInfo; + Object setterOrGetter; + + ActiveFieldInfo(FieldInfo fieldInfo) + { + this.fieldInfo = fieldInfo; + } + + } + + /** + * A list of {@link FieldInfo}s where each item maps a column name to a pojo + * field name. + */ + private List<FieldInfo> getFieldInfos() + { + return fieldInfos; + } + + /** + * Add the Active Fields to the columnFieldSetters {@link ActiveFieldInfo}s + */ + private void initColumnFieldSetters(List<FieldInfo> fieldInfos) + { + for (FieldInfo fi : fieldInfos) { + if (columnFieldSetters == null) { + columnFieldSetters = Lists.newArrayList(); + } + columnFieldSetters.add(new AvroToPojo.ActiveFieldInfo(fi)); + } + } + + /** + * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a + * pojo field name.<br/> + * The value from fieldInfo.column is assigned to + * fieldInfo.pojoFieldExpression. + * + * @description $[].columnName name of the Output Field in POJO + * @description $[].pojoFieldExpression expression to get the respective field + * from generic record + * @useSchema $[].pojoFieldExpression outputPort.fields[].name + */ + private void setFieldInfos(List<FieldInfo> fieldInfos) + { + this.fieldInfos = fieldInfos; + } + + /** + * Initialize the setters for generating the POJO + */ + private void initializeActiveFieldSetters() + { + for (int i = 0; i < columnFieldSetters.size(); i++) { + ActiveFieldInfo activeFieldInfo = columnFieldSetters.get(i); + + SupportType st = activeFieldInfo.fieldInfo.getType(); + + switch (st) { + + case BOOLEAN: + + activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case DOUBLE: + activeFieldInfo.setterOrGetter = PojoUtils.createSetterDouble(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case FLOAT: + activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case INTEGER: + activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + case STRING: + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression(), activeFieldInfo.fieldInfo.getType().getJavaType()); + break; + + case LONG: + activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression()); + break; + + default: + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(getPojoClass(), + activeFieldInfo.fieldInfo.getPojoFieldExpression(), Byte.class); + break; + } + + columnFieldSetters.get(i).setterOrGetter = activeFieldInfo.setterOrGetter; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(AvroToPojo.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java b/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java new file mode 100644 index 0000000..dc90800 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.avro; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.ClassUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.annotations.VisibleForTesting; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; + +/** + * <p> + * PojoToAvro + * </p> + * A generic implementation for POJO to Avro conversion. A POJO is converted to + * a GenericRecord based on the schema provided.<br> + * As of now only primitive types are supported.<br> + * Error records are emitted on the errorPort if connected + * + * @displayName Pojo To Avro + * @category Converter + * @tags avro + */ [email protected] +public class PojoToAvro extends BaseOperator +{ + + private List<Field> columnNames; + + private Class<?> cls; + + private List<Getter> keyMethodMap; + + private transient String schemaString; + + private transient Schema schema; + + @AutoMetric + @VisibleForTesting + int recordCount = 0; + + @AutoMetric + @VisibleForTesting + int errorCount = 0; + + @AutoMetric + @VisibleForTesting + int fieldErrorCount = 0; + + public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<GenericRecord>(); + + public final transient DefaultOutputPort<Object> errorPort = new DefaultOutputPort<Object>(); + + private void parseSchema() throws IOException + { + setSchema(new Schema.Parser().parse(getSchemaString())); + } + + /** + * Returns the schema string for Avro Generic Record + * + * @return schemaString + */ + public String getSchemaString() + { + return schemaString; + } + + /** + * Sets the schema string + */ + public void setSchemaString(String schemaString) + { + this.schemaString = schemaString; + } + + /** + * Returns the schema object + * + * @return schema + */ + private Schema getSchema() + { + return schema; + } + + /** + * Sets the shcema object + */ + private void setSchema(Schema schema) + { + this.schema = schema; + } + + /** + * Returns the list for field names from provided Avro schema + * + * @return List of Fields + */ + private List<Field> getColumnNames() + { + return columnNames; + } + + /** + * Sets the list of column names representing the fields in Avro schema + */ + private void setColumnNames(List<Field> columnNames) + { + this.columnNames = columnNames; + } + + /** + * This method generates the getters for provided field of a given class + * + * @return Getter + */ + private Getter<?, ?> generateGettersForField(Class<?> cls, String inputFieldName) + throws NoSuchFieldException, SecurityException + { + java.lang.reflect.Field f = cls.getDeclaredField(inputFieldName); + Class<?> c = ClassUtils.primitiveToWrapper(f.getType()); + + Getter<?, ?> classGetter = PojoUtils.createGetter(cls, inputFieldName, c); + + return classGetter; + } + + /** + * Initializes the list of columns in POJO based on the names from schema + */ + private void initializeColumnMap(Schema schema) + { + setColumnNames(schema.getFields()); + + keyMethodMap = new ArrayList<Getter>(); + for (int i = 0; i < getColumnNames().size(); i++) { + try { + keyMethodMap.add(generateGettersForField(cls, getColumnNames().get(i).name())); + } catch (NoSuchFieldException | SecurityException e) { + throw new RuntimeException("Failed to initialize pojo class getters for field - ", e); + } + } + } + + @InputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultInputPort<Object> data = new DefaultInputPort<Object>() + { + + @Override + public void setup(PortContext context) + { + cls = context.getValue(Context.PortContext.TUPLE_CLASS); + + try { + parseSchema(); + initializeColumnMap(getSchema()); + } catch (IOException e) { + LOG.error("Exception in parsing schema", e); + } + } + + @Override + public void process(Object tuple) + { + processTuple(tuple); + } + + }; + + /** + * Converts incoming tuples into Generic records + */ + protected void processTuple(Object tuple) + { + GenericRecord record = null; + + try { + record = getGenericRecord(tuple); + } catch (Exception e) { + LOG.error("Exception in parsing record"); + errorCount++; + } + + if (record != null) { + output.emit(record); + recordCount++; + } else if (errorPort.isConnected()) { + errorPort.emit(tuple); + errorCount++; + } + } + + /** + * Returns a generic record mapping the POJO fields to provided schema + * + * @return Generic Record + */ + private GenericRecord getGenericRecord(Object tuple) throws Exception + { + int writeErrorCount = 0; + GenericRecord rec = new GenericData.Record(getSchema()); + + for (int i = 0; i < columnNames.size(); i++) { + try { + rec.put(columnNames.get(i).name(), AvroRecordHelper.convertValueStringToAvroKeyType(getSchema(), + columnNames.get(i).name(), keyMethodMap.get(i).get(tuple).toString())); + } catch (AvroRuntimeException e) { + LOG.error("Could not set Field [" + columnNames.get(i).name() + "] in the generic record", e); + fieldErrorCount++; + } catch (Exception e) { + LOG.error("Parse Exception", e); + fieldErrorCount++; + writeErrorCount++; + } + } + + if (columnNames.size() == writeErrorCount) { + errorCount++; + return null; + } else { + return rec; + } + } + + @Override + public void beginWindow(long windowId) + { + recordCount = 0; + errorCount = 0; + fieldErrorCount = 0; + } + + private static final Logger LOG = LoggerFactory.getLogger(PojoToAvro.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java new file mode 100644 index 0000000..813f189 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileInputOperatorTest.java @@ -0,0 +1,448 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.avro; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.python.google.common.collect.Lists; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Sets; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.helper.TestPortContext; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * <p> + * In this class the emitTuples method is called twice to process the first + * input, since on begin window 0 the operator is setup & stream is initialized. + * The platform calls the emitTuples method in the successive windows + * </p> + */ +public class AvroFileInputOperatorTest +{ + + private static final String AVRO_SCHEMA = "{\"namespace\":\"abc\"," + "" + + "\"type\":\"record\",\"doc\":\"Order schema\"," + "\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\"," + + "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"}," + + "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"string\"}]}"; + + private static final String FILENAME = "/tmp/simpleorder.avro"; + private static final String OTHER_FILE = "/tmp/simpleorder2.avro"; + private static final String ERROR_FILE = "/tmp/errorFile.avro"; + + CollectorTestSink<Object> output = new CollectorTestSink<Object>(); + + CollectorTestSink<Object> completedFilesPort = new CollectorTestSink<Object>(); + + CollectorTestSink<Object> errorRecordsPort = new CollectorTestSink<Object>(); + + AvroFileInputOperator avroFileInput = new AvroFileInputOperator(); + + private List<GenericRecord> recordList = null; + + public static class TestMeta extends TestWatcher + { + public String dir = null; + Context.OperatorContext context; + Context.PortContext portContext; + + @Override + protected void starting(org.junit.runner.Description description) + { + String methodName = description.getMethodName(); + String className = description.getClassName(); + this.dir = "target/" + className + "/" + methodName; + Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(Context.DAGContext.APPLICATION_PATH, dir); + context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); + Attribute.AttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, SimpleOrder.class); + portContext = new TestPortContext(portAttributes); + } + + @Override + protected void finished(Description description) + { + try { + FileUtils.deleteDirectory(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testSingleFileAvroReads() throws Exception + { + FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true); + + int cnt = 7; + createAvroInput(cnt); + writeAvroFile(new File(FILENAME)); + + avroFileInput.output.setSink(output); + avroFileInput.completedFilesPort.setSink(completedFilesPort); + avroFileInput.errorRecordsPort.setSink(errorRecordsPort); + avroFileInput.setDirectory(testMeta.dir); + avroFileInput.setup(testMeta.context); + + avroFileInput.beginWindow(0); + avroFileInput.emitTuples(); + avroFileInput.emitTuples(); + Assert.assertEquals("Record count", cnt, avroFileInput.recordCount); + avroFileInput.endWindow(); + Assert.assertEquals("number tuples", cnt, output.collectedTuples.size()); + Assert.assertEquals("Error tuples", 0, errorRecordsPort.collectedTuples.size()); + Assert.assertEquals("Completed File", 1, completedFilesPort.collectedTuples.size()); + avroFileInput.teardown(); + + } + + @Test + public void testMultipleFileAvroReads() throws Exception + { + FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true); + + int cnt = 7; + + createAvroInput(cnt); + + writeAvroFile(new File(FILENAME)); + writeAvroFile(new File(OTHER_FILE)); + + avroFileInput.output.setSink(output); + avroFileInput.completedFilesPort.setSink(completedFilesPort); + avroFileInput.errorRecordsPort.setSink(errorRecordsPort); + avroFileInput.setDirectory(testMeta.dir); + avroFileInput.setup(testMeta.context); + + avroFileInput.beginWindow(0); + avroFileInput.emitTuples(); + avroFileInput.beginWindow(1); + avroFileInput.emitTuples(); + + Assert.assertEquals("number tuples after window 0", cnt, output.collectedTuples.size()); + + avroFileInput.emitTuples(); + avroFileInput.endWindow(); + + Assert.assertEquals("Error tuples", 0, errorRecordsPort.collectedTuples.size()); + Assert.assertEquals("number tuples after window 1", 2 * cnt, output.collectedTuples.size()); + Assert.assertEquals("Completed File", 2, completedFilesPort.collectedTuples.size()); + + avroFileInput.teardown(); + + } + + @Test + public void testInvalidFormatFailure() throws Exception + { + FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true); + + int cnt = 7; + writeErrorFile(cnt, new File(ERROR_FILE)); + + avroFileInput.output.setSink(output); + avroFileInput.setDirectory(testMeta.dir); + avroFileInput.setup(testMeta.context); + + avroFileInput.beginWindow(0); + avroFileInput.emitTuples(); + avroFileInput.emitTuples(); + avroFileInput.endWindow(); + + Assert.assertEquals("number tuples after window 1", 0, output.collectedTuples.size()); + avroFileInput.teardown(); + } + + private void createAvroInput(int cnt) + { + recordList = Lists.newArrayList(); + + while (cnt > 0) { + GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA)); + rec.put("orderId", cnt * 1L); + rec.put("customerId", cnt * 2); + rec.put("total", cnt * 1.5); + rec.put("customerName", "*" + cnt + "*"); + cnt--; + recordList.add(rec); + } + } + + private void writeErrorFile(int cnt, File errorFile) throws IOException + { + List<String> allLines = Lists.newArrayList(); + HashSet<String> lines = Sets.newHashSet(); + for (int line = 0; line < 5; line++) { + lines.add("f0" + "l" + line); + } + + allLines.addAll(lines); + + FileUtils.write(errorFile, StringUtils.join(lines, '\n')); + + FileUtils.moveFileToDirectory(new File(errorFile.getAbsolutePath()), new File(testMeta.dir), true); + } + + private void writeAvroFile(File outputFile) throws IOException + { + + DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>( + new Schema.Parser().parse(AVRO_SCHEMA)); + + DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); + dataFileWriter.create(new Schema.Parser().parse(AVRO_SCHEMA), outputFile); + + for (GenericRecord record : recordList) { + dataFileWriter.append(record); + } + + dataFileWriter.close(); + + FileUtils.moveFileToDirectory(new File(outputFile.getAbsolutePath()), new File(testMeta.dir), true); + + } + + @Test + public void testApplication() throws IOException, Exception + { + try { + FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true); + int cnt = 7; + createAvroInput(cnt); + writeAvroFile(new File(FILENAME)); + createAvroInput(cnt - 2); + writeAvroFile(new File(OTHER_FILE)); + avroFileInput.setDirectory(testMeta.dir); + + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + + AvroReaderApplication avroReaderApplication = new AvroReaderApplication(); + avroReaderApplication.setAvroFileInputOperator(avroFileInput); + lma.prepareDAG(avroReaderApplication, conf); + + LocalMode.Controller lc = lma.getController(); + lc.run(10000);// runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + @Test + public void testApplicationWithPojoConversion() throws IOException, Exception + { + try { + FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true); + int cnt = 7; + createAvroInput(cnt); + writeAvroFile(new File(FILENAME)); + createAvroInput(cnt - 2); + writeAvroFile(new File(OTHER_FILE)); + + avroFileInput.setDirectory(testMeta.dir); + + AvroToPojo avroToPojo = new AvroToPojo(); + avroToPojo.setPojoClass(SimpleOrder.class); + + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + + AvroToPojoApplication avroToPojoApplication = new AvroToPojoApplication(); + avroToPojoApplication.setAvroFileInputOperator(avroFileInput); + avroToPojoApplication.setAvroToPojo(avroToPojo); + + lma.prepareDAG(avroToPojoApplication, conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000);// runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + public static class AvroReaderApplication implements StreamingApplication + { + + AvroFileInputOperator avroFileInputOperator; + + public AvroFileInputOperator getAvroFileInput() + { + return avroFileInputOperator; + } + + public void setAvroFileInputOperator(AvroFileInputOperator avroFileInputOperator) + { + this.avroFileInputOperator = avroFileInputOperator; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + AvroFileInputOperator avroInputOperator = dag.addOperator("avroInputOperator", getAvroFileInput()); + ConsoleOutputOperator consoleOutput = dag.addOperator("GenericRecordOp", new ConsoleOutputOperator()); + dag.addStream("pojo", avroInputOperator.output, consoleOutput.input).setLocality(Locality.CONTAINER_LOCAL); + } + + } + + public static class AvroToPojoApplication implements StreamingApplication + { + + AvroFileInputOperator avroFileInputOperator; + AvroToPojo avroToPojo; + + public AvroFileInputOperator getAvroFileInput() + { + return avroFileInputOperator; + } + + public void setAvroFileInputOperator(AvroFileInputOperator avroFileInputOperator) + { + this.avroFileInputOperator = avroFileInputOperator; + } + + public void setAvroToPojo(AvroToPojo avroToPojo) + { + this.avroToPojo = avroToPojo; + } + + public AvroToPojo getAvroToPojo() + { + return avroToPojo; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + AvroFileInputOperator avroInputOperator = dag.addOperator("avroInputOperator", getAvroFileInput()); + AvroToPojo avroToPojo = dag.addOperator("AvroToPojo", getAvroToPojo()); + ConsoleOutputOperator consoleOutput = dag.addOperator("GenericRecordOp", new ConsoleOutputOperator()); + dag.getMeta(avroToPojo).getMeta(avroToPojo.output).getAttributes().put(Context.PortContext.TUPLE_CLASS, + SimpleOrder.class); + + dag.addStream("GenericRecords", avroInputOperator.output, avroToPojo.data).setLocality(Locality.THREAD_LOCAL); + dag.addStream("POJO", avroToPojo.output, consoleOutput.input).setLocality(Locality.CONTAINER_LOCAL); + } + + } + + public static class SimpleOrder + { + + private Integer customerId; + private Long orderId; + private Double total; + private String customerName; + + public SimpleOrder() + { + } + + public SimpleOrder(int customerId, long orderId, double total, String customerName) + { + setCustomerId(customerId); + setOrderId(orderId); + setTotal(total); + setCustomerName(customerName); + } + + public String getCustomerName() + { + return customerName; + } + + public void setCustomerName(String customerName) + { + this.customerName = customerName; + } + + public Integer getCustomerId() + { + return customerId; + } + + public void setCustomerId(Integer customerId) + { + this.customerId = customerId; + } + + public Long getOrderId() + { + return orderId; + } + + public void setOrderId(Long orderId) + { + this.orderId = orderId; + } + + public Double getTotal() + { + return total; + } + + public void setTotal(Double total) + { + this.total = total; + } + + @Override + public String toString() + { + return "SimpleOrder [customerId=" + customerId + ", orderId=" + orderId + ", total=" + total + ", customerName=" + + customerName + "]"; + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/test/java/com/datatorrent/contrib/avro/AvroToPojoTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroToPojoTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroToPojoTest.java new file mode 100644 index 0000000..23714a3 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroToPojoTest.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.avro; + +import java.util.List; +import java.util.ListIterator; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.python.google.common.collect.Lists; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.lib.helper.TestPortContext; +import com.datatorrent.lib.testbench.CollectorTestSink; + +public class AvroToPojoTest +{ + public static final String fieldInfoInitMap = "orderId:orderId:LONG," + "customerId:customerId:INTEGER," + + "customerName:customerName:STRING," + "total:total:DOUBLE"; + + public static final String byteFieldInfoInitMap = "orderId:orderId:LONG," + "customerId:customerId:INTEGER," + + "customerName:customerName:BYTES," + "total:total:DOUBLE"; + + private static final String AVRO_SCHEMA = "{\"namespace\":\"abc\"," + "" + + "\"type\":\"record\",\"doc\":\"Order schema\"," + "\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\"," + + "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"}," + + "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"string\"}]}"; + + private static final String AVRO_SCHEMA_FOR_BYTES = "{\"namespace\":\"abc\"," + "" + + "\"type\":\"record\",\"doc\":\"Order schema\"," + "\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\"," + + "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"}," + + "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"bytes\"}]}"; + + CollectorTestSink<Object> outputSink = new CollectorTestSink<Object>(); + AvroToPojo avroReader = new AvroToPojo(); + + private List<GenericRecord> recordList = null; + + public class TestMeta extends TestWatcher + { + Context.OperatorContext context; + Context.PortContext portContext; + + @Override + protected void starting(org.junit.runner.Description description) + { + Attribute.AttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, SimpleOrder.class); + portContext = new TestPortContext(portAttributes); + super.starting(description); + avroReader.output.setSink(outputSink); + createReaderInput(); + } + + @Override + protected void finished(Description description) + { + avroReader.teardown(); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testAvroReads() throws Exception + { + + avroReader.setPojoClass(SimpleOrder.class); + avroReader.setGenericRecordToPOJOFieldsMapping(fieldInfoInitMap); + avroReader.output.setup(testMeta.portContext); + avroReader.setup(testMeta.context); + + avroReader.beginWindow(0); + + ListIterator<GenericRecord> itr = recordList.listIterator(); + + while (itr.hasNext()) { + avroReader.data.process(itr.next()); + } + + avroReader.endWindow(); + Assert.assertEquals("Number of tuples", 3, outputSink.collectedTuples.size()); + avroReader.teardown(); + + } + + @Test + public void testAvroReadsInvalidDataType() throws Exception + { + + avroReader.setPojoClass(SimpleOrder.class); + avroReader.setGenericRecordToPOJOFieldsMapping(byteFieldInfoInitMap); + avroReader.output.setup(testMeta.portContext); + avroReader.setup(testMeta.context); + + avroReader.beginWindow(0); + + ListIterator<GenericRecord> itr = recordList.listIterator(); + + while (itr.hasNext()) { + avroReader.data.process(itr.next()); + } + + avroReader.endWindow(); + Assert.assertEquals("Number of tuples", 3, outputSink.collectedTuples.size()); + avroReader.teardown(); + + } + + @Test + public void testAvroReadsWithReflection() throws Exception + { + + avroReader.setPojoClass(SimpleOrder.class); + avroReader.output.setup(testMeta.portContext); + avroReader.setup(testMeta.context); + + avroReader.beginWindow(0); + + ListIterator<GenericRecord> itr = recordList.listIterator(); + + while (itr.hasNext()) { + avroReader.data.process(itr.next()); + } + + avroReader.endWindow(); + Assert.assertEquals("Number of tuples", 3, outputSink.collectedTuples.size()); + avroReader.teardown(); + + } + + @Test + public void testReadFailures() throws Exception + { + + avroReader.setPojoClass(SimpleOrder.class); + avroReader.setGenericRecordToPOJOFieldsMapping(fieldInfoInitMap); + avroReader.output.setup(testMeta.portContext); + avroReader.setup(testMeta.context); + + avroReader.beginWindow(0); + + ListIterator<GenericRecord> itr = recordList.listIterator(); + + while (itr.hasNext()) { + GenericRecord rec = itr.next(); + rec.put("orderId", "abc"); + avroReader.data.process(rec); + } + + Assert.assertEquals("Number of tuples", 3, avroReader.errorCount); + avroReader.endWindow(); + avroReader.teardown(); + + } + + @Test + public void testReadFieldFailures() throws Exception + { + + int cnt = 3; + + avroReader.setPojoClass(SimpleOrder.class); + avroReader.setGenericRecordToPOJOFieldsMapping(fieldInfoInitMap); + avroReader.output.setup(testMeta.portContext); + avroReader.setup(testMeta.context); + + avroReader.beginWindow(0); + + for (int i = 0; i < cnt; i++) { + avroReader.data.process(null); + } + + Assert.assertEquals("Number of tuples", 12, avroReader.fieldErrorCount); + + avroReader.endWindow(); + avroReader.teardown(); + + } + + private void createReaderInput() + { + int cnt = 3; + + recordList = Lists.newArrayList(); + + while (cnt > 0) { + GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA)); + rec.put("orderId", cnt * 1L); + rec.put("customerId", cnt * 2); + rec.put("total", cnt * 1.5); + rec.put("customerName", "*" + cnt + "*"); + cnt--; + recordList.add(rec); + } + } + + public static class SimpleOrder + { + + private Integer customerId; + private Long orderId; + private Double total; + private String customerName; + + public SimpleOrder() + { + } + + public SimpleOrder(int customerId, long orderId, double total, String customerName) + { + setCustomerId(customerId); + setOrderId(orderId); + setTotal(total); + setCustomerName(customerName); + } + + public String getCustomerName() + { + return customerName; + } + + public void setCustomerName(String customerName) + { + this.customerName = customerName; + } + + public Integer getCustomerId() + { + return customerId; + } + + public void setCustomerId(Integer customerId) + { + this.customerId = customerId; + } + + public Long getOrderId() + { + return orderId; + } + + public void setOrderId(Long orderId) + { + this.orderId = orderId; + } + + public Double getTotal() + { + return total; + } + + public void setTotal(Double total) + { + this.total = total; + } + + @Override + public String toString() + { + return "SimpleOrder [customerId=" + customerId + ", orderId=" + orderId + ", total=" + total + ", customerName=" + + customerName + "]"; + } + + } + + public static class Order + { + + private int orderId; + + public Order() + { + + } + + public Order(int orderId) + { + this.orderId = orderId; + } + + public int getOrderId() + { + return orderId; + } + + public void setOrderId(int orderId) + { + this.orderId = orderId; + } + + @Override + public String toString() + { + return "Order [orderId=" + orderId + "]"; + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5075ce0e/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java new file mode 100644 index 0000000..772a057 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.avro; + +import java.util.List; +import java.util.ListIterator; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.python.google.common.collect.Lists; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.lib.helper.TestPortContext; +import com.datatorrent.lib.testbench.CollectorTestSink; + +public class PojoToAvroTest +{ + + private static final String AVRO_SCHEMA = "{\"namespace\":\"abc\"," + "" + + "\"type\":\"record\",\"doc\":\"Order schema\"," + "\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\"," + + "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"}," + + "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"string\"}]}"; + + CollectorTestSink<Object> outputSink = new CollectorTestSink<Object>(); + PojoToAvro avroWriter = new PojoToAvro(); + + public class TestMeta extends TestWatcher + { + public String dir = null; + Context.OperatorContext context; + Context.PortContext portContext; + + @Override + protected void starting(org.junit.runner.Description description) + { + Attribute.AttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, SimpleOrder.class); + portContext = new TestPortContext(portAttributes); + super.starting(description); + avroWriter.output.setSink(outputSink); + } + + @Override + protected void finished(Description description) + { + avroWriter.teardown(); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testWriting() throws Exception + { + + List<SimpleOrder> orderList = Lists.newArrayList(); + orderList.add(new SimpleOrder(1, 11, 100.25, "customerOne")); + orderList.add(new SimpleOrder(2, 22, 200.25, "customerTwo")); + orderList.add(new SimpleOrder(3, 33, 300.25, "customerThree")); + + avroWriter.setSchemaString(AVRO_SCHEMA); + avroWriter.data.setup(testMeta.portContext); + avroWriter.setup(testMeta.context); + + avroWriter.beginWindow(0); + + ListIterator<SimpleOrder> itr = orderList.listIterator(); + + while (itr.hasNext()) { + avroWriter.data.process(itr.next()); + } + + avroWriter.endWindow(); + Assert.assertEquals("Number of tuples", 3, outputSink.collectedTuples.size()); + avroWriter.teardown(); + + } + + @Test + public void testWriteFailure() throws Exception + { + + List<Order> orderList = Lists.newArrayList(); + orderList.add(new Order(11)); + orderList.add(new Order(22)); + orderList.add(new Order(33)); + + avroWriter.setSchemaString(AVRO_SCHEMA); + + avroWriter.setup(testMeta.context); + avroWriter.data.setup(testMeta.portContext); + + avroWriter.beginWindow(0); + + ListIterator<Order> itr = orderList.listIterator(); + + while (itr.hasNext()) { + avroWriter.data.process(itr.next()); + } + + Assert.assertEquals("Field write failures", 12, avroWriter.fieldErrorCount); + + Assert.assertEquals("Record write failures", 3, avroWriter.errorCount); + + avroWriter.endWindow(); + + Assert.assertEquals("Number of tuples", 0, outputSink.collectedTuples.size()); + + avroWriter.teardown(); + + } + + public static class SimpleOrder + { + + private Integer customerId; + private Long orderId; + private Double total; + private String customerName; + + public SimpleOrder() + { + } + + public SimpleOrder(int customerId, long orderId, double total, String customerName) + { + setCustomerId(customerId); + setOrderId(orderId); + setTotal(total); + setCustomerName(customerName); + } + + public String getCustomerName() + { + return customerName; + } + + public void setCustomerName(String customerName) + { + this.customerName = customerName; + } + + public Integer getCustomerId() + { + return customerId; + } + + public void setCustomerId(Integer customerId) + { + this.customerId = customerId; + } + + public Long getOrderId() + { + return orderId; + } + + public void setOrderId(Long orderId) + { + this.orderId = orderId; + } + + public Double getTotal() + { + return total; + } + + public void setTotal(Double total) + { + this.total = total; + } + + @Override + public String toString() + { + return "SimpleOrder [customerId=" + customerId + ", orderId=" + orderId + ", total=" + total + ", customerName=" + + customerName + "]"; + } + + } + + public static class Order + { + + private int orderId; + + public Order() + { + + } + + public Order(int orderId) + { + this.orderId = orderId; + } + + public int getOrderId() + { + return orderId; + } + + public void setOrderId(int orderId) + { + this.orderId = orderId; + } + + @Override + public String toString() + { + return "Order [orderId=" + orderId + "]"; + } + + } + +}
