http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelNodeVisitor.java ---------------------------------------------------------------------- diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelNodeVisitor.java b/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelNodeVisitor.java new file mode 100644 index 0000000..68343ce --- /dev/null +++ b/sql/src/main/java/org/apache/apex/malhar/sql/planner/RelNodeVisitor.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.planner; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.apex.malhar.sql.operators.OperatorUtils; +import org.apache.apex.malhar.sql.schema.TupleSchemaRegistry; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.rel.RelNode; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; + +/** + * This class is the main class that converts relational algebra to a sub-DAG. + */ [email protected] +public class RelNodeVisitor +{ + private final DAG dag; + private final TupleSchemaRegistry tupleSchemaRegistry; + private final JavaTypeFactory typeFactory; + + public RelNodeVisitor(DAG dag, JavaTypeFactory typeFactory) + { + this.dag = dag; + this.typeFactory = typeFactory; + this.tupleSchemaRegistry = new TupleSchemaRegistry(); + } + + /** + * This is the main method in this relational node visitor which traverses the relational algebra in reverse direction + * and populate the given underlying DAG object. + * + * @param relNode RelNode which needs to be traversed. + * + * @return RelInfo representing information of current stage + * @throws Exception + */ + public final RelInfo traverse(RelNode relNode) throws Exception + { + List<RelInfo> inputStreams = new ArrayList<>(); + for (RelNode input : relNode.getInputs()) { + inputStreams.add(traverse(input)); + } + + ApexRelNode.RelContext relContext = new ApexRelNode.RelContext(dag, typeFactory, tupleSchemaRegistry); + + RelInfo currentNodeRelInfo; + ApexRelNode apexRelNode = ApexRelNode.relNodeMapping.get(relNode.getClass()); + if (apexRelNode == null) { + throw new UnsupportedOperationException("RelNode " + relNode.getRelTypeName() + " is not supported."); + } + currentNodeRelInfo = apexRelNode.visit(relContext, relNode, inputStreams); + + if (currentNodeRelInfo != null && inputStreams.size() != 0) { + for (int i = 0; i < inputStreams.size(); i++) { + RelInfo inputStream = inputStreams.get(i); + Operator.OutputPort outputPort = inputStream.getOutPort(); + Operator.InputPort inputPort = currentNodeRelInfo.getInputPorts().get(i); + + String streamName = OperatorUtils.getUniqueStreamName(inputStream.getRelName(), + currentNodeRelInfo.getRelName()); + Class schema; + if (inputStream.getOutRelDataType() != null) { + schema = TupleSchemaRegistry.getSchemaForRelDataType(tupleSchemaRegistry, streamName, + inputStream.getOutRelDataType()); + } else if (inputStream.getClazz() != null) { + schema = inputStream.getClazz(); + } else { + throw new RuntimeException("Unexpected condition reached."); + } + dag.setOutputPortAttribute(outputPort, Context.PortContext.TUPLE_CLASS, schema); + dag.setInputPortAttribute(inputPort, Context.PortContext.TUPLE_CLASS, schema); + dag.addStream(streamName, outputPort, inputPort); + } + } + + if (currentNodeRelInfo.getOutPort() == null) { + // End of the pipeline. + String schemaJar = tupleSchemaRegistry.generateCommonJar(); + + String jars = dag.getAttributes().get(Context.DAGContext.LIBRARY_JARS); + dag.setAttribute(Context.DAGContext.LIBRARY_JARS, + ((jars != null) && (jars.length() != 0)) ? jars + "," + schemaJar : schemaJar); + } + + return currentNodeRelInfo; + } +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTable.java ---------------------------------------------------------------------- diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTable.java b/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTable.java new file mode 100644 index 0000000..6d16f63 --- /dev/null +++ b/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTable.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.schema; + +import java.util.Map; + +import org.apache.apex.malhar.sql.table.Endpoint; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.schema.StreamableTable; +import org.apache.calcite.schema.Table; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.ImmutableList; + +/** + * This is representation of Apex source/destination to Calcite's {@link StreamableTable} table. + * Any table that gets registered with {@link org.apache.apex.malhar.sql.SQLExecEnvironment} + * gets registered as {@link ApexSQLTable}. + */ [email protected] +public class ApexSQLTable implements ScannableTable, StreamableTable +{ + private SchemaPlus schema; + private String name; + private Map<String, Object> operands; + private RelDataType rowType; + private Endpoint endpoint; + + public ApexSQLTable(SchemaPlus schemaPlus, String name, Map<String, Object> operands, RelDataType rowType, + Endpoint endpoint) + { + this.schema = schemaPlus; + this.name = name; + this.operands = operands; + this.rowType = rowType; + this.endpoint = endpoint; + } + + public ApexSQLTable(SchemaPlus schema, String name, Endpoint endpoint) + { + this(schema, name, null, null, endpoint); + } + + @Override + public Enumerable<Object[]> scan(DataContext dataContext) + { + return null; + } + + @Override + public Table stream() + { + return new ApexSQLTable(schema, name, operands, rowType, endpoint); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) + { + if (rowType == null) { + rowType = endpoint.getRowType(relDataTypeFactory); + } + return rowType; + } + + @Override + public Statistic getStatistic() + { + return Statistics.of(100d, ImmutableList.<ImmutableBitSet>of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() + { + return Schema.TableType.STREAM; + } + + public SchemaPlus getSchema() + { + return schema; + } + + public String getName() + { + return name; + } + + public Map<String, Object> getOperands() + { + return operands; + } + + public RelDataType getRowType() + { + return rowType; + } + + public Endpoint getEndpoint() + { + return endpoint; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTableFactory.java ---------------------------------------------------------------------- diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTableFactory.java b/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTableFactory.java new file mode 100644 index 0000000..c18f854 --- /dev/null +++ b/sql/src/main/java/org/apache/apex/malhar/sql/schema/ApexSQLTableFactory.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.schema; + +import java.util.Map; + +import org.apache.apex.malhar.sql.table.CSVMessageFormat; +import org.apache.apex.malhar.sql.table.Endpoint; +import org.apache.apex.malhar.sql.table.FileEndpoint; +import org.apache.apex.malhar.sql.table.KafkaEndpoint; +import org.apache.apex.malhar.sql.table.MessageFormat; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.TableFactory; +import org.apache.hadoop.classification.InterfaceStability; + [email protected] +public class ApexSQLTableFactory implements TableFactory<Table> +{ + @SuppressWarnings("unchecked") + @Override + public Table create(SchemaPlus schemaPlus, String name, Map<String, Object> operands, RelDataType rowType) + { + Endpoint endpoint; + String endpointSystemType = (String)operands.get(Endpoint.ENDPOINT); + + if (endpointSystemType.equalsIgnoreCase(Endpoint.EndpointType.FILE.name())) { + endpoint = new FileEndpoint(); + } else if (endpointSystemType.equalsIgnoreCase(Endpoint.EndpointType.KAFKA.name())) { + endpoint = new KafkaEndpoint(); + } else { + throw new RuntimeException("Cannot find endpoint"); + } + endpoint.setEndpointOperands((Map<String, Object>)operands.get(Endpoint.SYSTEM_OPERANDS)); + + MessageFormat mf; + String messageFormat = (String)operands.get(MessageFormat.MESSAGE_FORMAT); + if (messageFormat.equalsIgnoreCase(MessageFormat.MessageFormatType.CSV.name())) { + mf = new CSVMessageFormat(); + } else { + throw new RuntimeException("Cannot find message format"); + } + mf.setMessageFormatOperands((Map<String, Object>)operands.get(MessageFormat.MESSAGE_FORMAT_OPERANDS)); + + endpoint.setMessageFormat(mf); + + return new ApexSQLTable(schemaPlus, name, operands, rowType, endpoint); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java b/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java new file mode 100644 index 0000000..7924298 --- /dev/null +++ b/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.schema; + +import java.io.File; +import java.io.IOException; +import java.sql.Time; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.jar.JarOutputStream; +import java.util.zip.ZipEntry; + +import org.codehaus.jettison.json.JSONException; + +import org.apache.apex.malhar.lib.utils.ClassLoaderUtils; +import org.apache.apex.malhar.sql.codegen.BeanClassGenerator; +import org.apache.apex.malhar.sql.operators.OperatorUtils; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; + +import org.apache.commons.lang.ClassUtils; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + [email protected] +public class TupleSchemaRegistry +{ + public static final String FQCN_PACKAGE = "org.apache.apex.generated.schema."; + private Map<String, Schema> schemas = new HashMap<>(); + + public Schema createNewSchema(String name) + { + if (schemas.containsKey(name)) { + return schemas.get(name); + } + + Schema schema = new Schema(); + schema.name = name; + schemas.put(name, schema); + + return schema; + } + + public Schema getSchemaDefinition(String name) + { + return schemas.get(name); + } + + public String generateCommonJar() throws IOException + { + File file = File.createTempFile("schemaSQL", ".jar"); + + FileSystem fs = FileSystem.newInstance(file.toURI(), new Configuration()); + FSDataOutputStream out = fs.create(new Path(file.getAbsolutePath())); + JarOutputStream jout = new JarOutputStream(out); + + for (Schema schema : schemas.values()) { + jout.putNextEntry(new ZipEntry(schema.fqcn.replace(".", "/") + ".class")); + jout.write(schema.beanClassBytes); + jout.closeEntry(); + } + + jout.close(); + out.close(); + + return file.getAbsolutePath(); + } + + public static Class getSchemaForRelDataType(TupleSchemaRegistry registry, String schemaName, RelDataType rowType) + { + if (rowType.isStruct()) { + TupleSchemaRegistry.Schema newSchema = registry.createNewSchema(schemaName); + for (RelDataTypeField field : rowType.getFieldList()) { + RelDataType type = field.getType(); + newSchema.addField(OperatorUtils.getValidFieldName(field), convertPrimitiveToSqlType(type)); + } + try { + newSchema.generateBean(); + } catch (IOException | JSONException e) { + throw new RuntimeException("Failed to generate schema", e); + } + return newSchema.beanClass; + } else { + throw new UnsupportedOperationException("Non-struct row type is not implemented."); + } + } + + private static Class convertPrimitiveToSqlType(RelDataType type) + { + /* I hope that following this method instead of calling value.value() is better + because we can catch any type mismatches. */ + switch (type.getSqlTypeName()) { + case BOOLEAN: + return Boolean.class; + case TINYINT: + case SMALLINT: + case INTEGER: + return Integer.class; + case BIGINT: + return Long.class; + case REAL: + return Float.class; + case FLOAT: + case DOUBLE: + return Double.class; + case DATE: + return Date.class; + case TIME: + return Date.class; + case TIMESTAMP: + return Date.class; + case CHAR: + case VARCHAR: + return String.class; + case BINARY: + case VARBINARY: + return Byte.class; + case ANY: + case SYMBOL: + return Object.class; + default: + throw new RuntimeException(String.format("Unsupported type %s", type.getSqlTypeName())); + } + } + + public enum Type + { + BOOLEAN(Boolean.class), SHORT(Short.class), INTEGER(Integer.class), LONG(Long.class), + FLOAT(Float.class), DOUBLE(Double.class), STRING(String.class), OBJECT(Object.class), + DATE(Date.class), TIME(Time.class); + + private Class javaType; + + Type(Class javaType) + { + this.javaType = javaType; + } + + public static Type getFromJavaType(Class type) + { + for (Type supportType : Type.values()) { + if (supportType.getJavaType() == ClassUtils.primitiveToWrapper(type)) { + return supportType; + } + } + + return OBJECT; + } + + public Class getJavaType() + { + return javaType; + } + } + + public static class Schema + { + public String name; + public String fqcn; + public List<SQLFieldInfo> fieldList = new ArrayList<>(); + public Class beanClass; + public byte[] beanClassBytes; + + public Schema addField(String fieldName, Class fieldType) + { + fieldList.add(new SQLFieldInfo(fieldName, Type.getFromJavaType(fieldType))); + return this; + } + + public Schema generateBean() throws IOException, JSONException + { + // Generate + this.fqcn = FQCN_PACKAGE + name; + + // Use Bean Class generator to generate the class + this.beanClassBytes = BeanClassGenerator.createAndWriteBeanClass(this.fqcn, fieldList); + this.beanClass = ClassLoaderUtils.readBeanClass(fqcn, beanClassBytes); + + return this; + } + } + + public static class SQLFieldInfo + { + String columnName; + Type type; + + public SQLFieldInfo(String columnName, Type type) + { + this.columnName = columnName; + this.type = type; + } + + public String getColumnName() + { + return columnName; + } + + public Type getType() + { + return type; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/CSVMessageFormat.java ---------------------------------------------------------------------- diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/CSVMessageFormat.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/CSVMessageFormat.java new file mode 100644 index 0000000..a96df65 --- /dev/null +++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/CSVMessageFormat.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.table; + +import java.util.Map; + +import org.apache.apex.malhar.sql.operators.OperatorUtils; +import org.apache.apex.malhar.sql.planner.RelInfo; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.contrib.formatter.CsvFormatter; +import com.datatorrent.contrib.parser.CsvParser; +import com.datatorrent.contrib.parser.DelimitedSchema; + [email protected] +public class CSVMessageFormat implements MessageFormat +{ + public static final String CSV_SCHEMA = "schema"; + private Map<String, Object> operands; + + public CSVMessageFormat() + { + } + + public CSVMessageFormat(String schema) + { + this.operands = ImmutableMap.<String, Object>of(CSV_SCHEMA, schema); + } + + @Override + public MessageFormatType getMessageFormatType() + { + return MessageFormatType.CSV; + } + + @Override + public void setMessageFormatOperands(Map<String, Object> operands) + { + this.operands = operands; + } + + @Override + public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory) + { + CsvParser csvParser = dag.addOperator(OperatorUtils.getUniqueOperatorName("CSVParser"), CsvParser.class); + csvParser.setSchema((String)operands.get(CSV_SCHEMA)); + + return new RelInfo("CSVParser", Lists.<Operator.InputPort>newArrayList(csvParser.in), csvParser, csvParser.out, + getRowType(typeFactory)); + } + + @Override + public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory) + { + CsvFormatter formatter = dag.addOperator(OperatorUtils.getUniqueOperatorName("CSVFormatter"), CsvFormatter.class); + formatter.setSchema((String)operands.get(CSV_SCHEMA)); + + return new RelInfo("CSVFormatter", Lists.<Operator.InputPort>newArrayList(formatter.in), formatter, formatter.out, + getRowType(typeFactory)); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) + { + String schema = (String)operands.get(CSV_SCHEMA); + RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); + + DelimitedSchema delimitedSchema = new DelimitedSchema(schema); + for (DelimitedSchema.Field field : delimitedSchema.getFields()) { + builder.add(field.getName(), convertField(typeFactory, field.getType())); + } + + return builder.build(); + } + + private RelDataType convertField(RelDataTypeFactory typeFactory, DelimitedSchema.FieldType type) + { + RelDataType relDataType; + switch (type) { + case BOOLEAN: + relDataType = typeFactory.createSqlType(SqlTypeName.BOOLEAN); + break; + case DOUBLE: + relDataType = typeFactory.createSqlType(SqlTypeName.DOUBLE); + break; + case INTEGER: + relDataType = typeFactory.createSqlType(SqlTypeName.INTEGER); + break; + case FLOAT: + relDataType = typeFactory.createSqlType(SqlTypeName.FLOAT); + break; + case LONG: + relDataType = typeFactory.createSqlType(SqlTypeName.BIGINT); + break; + case SHORT: + relDataType = typeFactory.createSqlType(SqlTypeName.SMALLINT); + break; + case CHARACTER: + relDataType = typeFactory.createSqlType(SqlTypeName.CHAR); + break; + case STRING: + relDataType = typeFactory.createSqlType(SqlTypeName.VARCHAR); + break; + case DATE: + relDataType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP); + break; + default: + relDataType = typeFactory.createSqlType(SqlTypeName.ANY); + } + + return relDataType; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/Endpoint.java ---------------------------------------------------------------------- diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/Endpoint.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/Endpoint.java new file mode 100644 index 0000000..41a26de --- /dev/null +++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/Endpoint.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.table; + +import java.util.Map; + +import org.apache.apex.malhar.sql.planner.RelInfo; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.DAG; + +/** + * This interface defines abstract table and how it should be operated with. + * Endpoint interface can be implemented for any type of data source eg. Kafka, File, JDBC etc. + * Implementation of Endpoint interface should define how the table should represented for both input OR output side. + */ [email protected] +public interface Endpoint +{ + String ENDPOINT = "endpoint"; + String SYSTEM_OPERANDS = "endpointOperands"; + + /** + * Returns target type system + * @return Returns target type system + */ + EndpointType getTargetType(); + + /** + * Set Endpoint operands. This method is used when the table definitions are provided using calcite schema format. + * This is the map which is present against key "endpointOperands" in calcite schema definition input file. + * + * @param operands Map of endpoint operands. + */ + void setEndpointOperands(Map<String, Object> operands); + + /** + * Message Format type which defines how the data should be interpreted for both input and output side. + * + * @param messageFormat Object of type MessageFormat + */ + void setMessageFormat(MessageFormat messageFormat); + + /** + * Implementation of this method should populate Apex DAG if this table is at input side of pipeline. + * + * @param dag {@link DAG} object to be populated + * @param typeFactory Java Type Factory + * + * @return Returns {@link RelInfo} describing output of this input phase. + */ + RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory); + + /** + * Implementation of this method should populate Apex DAG if table is at output side of pipeline. + * + * @param dag {@link DAG} object to be populated + * @param typeFactory Java Type Factory + * @return Returns {@link RelInfo} describing expected input of this output phase. + */ + RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory); + + /** + * This method returns what should be the input data type to output phase OR output data type of input phase. + * + * @param typeFactory Java Type Factory for data type conversions. + * + * @return {@link RelDataType} representing data type format. + */ + RelDataType getRowType(RelDataTypeFactory typeFactory); + + /** + * Type of Endpoints + */ + enum EndpointType + { + FILE, + KAFKA, + PORT + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/FileEndpoint.java ---------------------------------------------------------------------- diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/FileEndpoint.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/FileEndpoint.java new file mode 100644 index 0000000..cac32a4 --- /dev/null +++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/FileEndpoint.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.table; + +import java.util.Map; + +import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator; +import org.apache.apex.malhar.sql.operators.LineReader; +import org.apache.apex.malhar.sql.operators.OperatorUtils; +import org.apache.apex.malhar.sql.planner.RelInfo; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; + +/** + * This is an implementation of {@link Endpoint} which defined how data should be read/written to file system. + */ [email protected] +public class FileEndpoint implements Endpoint +{ + public static final String FILE_INPUT_DIRECTORY = "directory"; + public static final String FILE_OUT_PATH = "outputFilePath"; + public static final String FILE_OUT_NAME = "outputFileName"; + + private MessageFormat messageFormat; + + private Map<String, Object> operands; + + public FileEndpoint() + { + } + + public FileEndpoint(String directory, MessageFormat messageFormat) + { + this.messageFormat = messageFormat; + this.operands = ImmutableMap.<String, Object>of(FILE_INPUT_DIRECTORY, directory); + } + + public FileEndpoint(String directory, String fileName, MessageFormat messageFormat) + { + this.messageFormat = messageFormat; + this.operands = ImmutableMap.<String, Object>of(FILE_OUT_PATH, directory, FILE_OUT_NAME, fileName); + } + + @Override + public EndpointType getTargetType() + { + return EndpointType.FILE; + } + + @Override + public void setEndpointOperands(Map<String, Object> operands) + { + this.operands = operands; + } + + @Override + public void setMessageFormat(MessageFormat messageFormat) + { + this.messageFormat = messageFormat; + } + + @Override + public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory) + { + LineReader fileInput = dag.addOperator(OperatorUtils.getUniqueOperatorName("FileInput"), LineReader.class); + fileInput.setDirectory((String)operands.get(FILE_INPUT_DIRECTORY)); + + RelInfo spec = messageFormat.populateInputDAG(dag, typeFactory); + dag.addStream(OperatorUtils.getUniqueStreamName("File", "Parser"), fileInput.output, spec.getInputPorts().get(0)); + return new RelInfo("Input", Lists.<Operator.InputPort>newArrayList(), spec.getOperator(), spec.getOutPort(), + messageFormat.getRowType(typeFactory)); + } + + @Override + public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory) + { + RelInfo spec = messageFormat.populateOutputDAG(dag, typeFactory); + + GenericFileOutputOperator.StringFileOutputOperator fileOutput = + dag.addOperator(OperatorUtils.getUniqueOperatorName("FileOutput"), + GenericFileOutputOperator.StringFileOutputOperator.class); + fileOutput.setFilePath((String)operands.get(FILE_OUT_PATH)); + fileOutput.setOutputFileName((String)operands.get(FILE_OUT_NAME)); + + dag.addStream(OperatorUtils.getUniqueStreamName("Formatter", "File"), spec.getOutPort(), fileOutput.input); + + return new RelInfo("Output", spec.getInputPorts(), spec.getOperator(), null, messageFormat.getRowType(typeFactory)); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) + { + return messageFormat.getRowType(typeFactory); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/KafkaEndpoint.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/KafkaEndpoint.java new file mode 100644 index 0000000..56419c3 --- /dev/null +++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/KafkaEndpoint.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.table; + +import java.util.Map; +import java.util.Properties; + +import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; +import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator; +import org.apache.apex.malhar.sql.operators.OperatorUtils; +import org.apache.apex.malhar.sql.planner.RelInfo; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; + +/** + * This is an implementation of {@link Endpoint} which defined how data should be read/written from kafka messaging system + */ [email protected] +public class KafkaEndpoint implements Endpoint +{ + public static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + public static final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + + + public static final String KAFKA_SERVERS = "servers"; + public static final String KAFKA_TOPICS = "topics"; + + private MessageFormat messageFormat; + + private Map<String, Object> operands; + + public KafkaEndpoint() + { + } + + public KafkaEndpoint(String kafkaServers, String topics, MessageFormat messageFormat) + { + this.messageFormat = messageFormat; + this.operands = ImmutableMap.<String, Object>of(KAFKA_SERVERS, kafkaServers, KAFKA_TOPICS, topics); + } + + @Override + public EndpointType getTargetType() + { + return EndpointType.KAFKA; + } + + @Override + public void setEndpointOperands(Map<String, Object> operands) + { + this.operands = operands; + } + + @Override + public void setMessageFormat(MessageFormat messageFormat) + { + this.messageFormat = messageFormat; + } + + @Override + public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory) + { + KafkaSinglePortInputOperator kafkaInput = dag.addOperator(OperatorUtils.getUniqueOperatorName("KafkaInput"), + KafkaSinglePortInputOperator.class); + kafkaInput.setTopics((String)operands.get(KAFKA_TOPICS)); + kafkaInput.setInitialOffset("EARLIEST"); + + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, operands.get(KAFKA_SERVERS)); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER); + kafkaInput.setConsumerProps(props); + + kafkaInput.setClusters((String)operands.get(KAFKA_SERVERS)); + + RelInfo spec = messageFormat.populateInputDAG(dag, typeFactory); + dag.addStream(OperatorUtils.getUniqueStreamName("Kafka", "Parser"), kafkaInput.outputPort, + spec.getInputPorts().get(0)); + return new RelInfo("Input", Lists.<Operator.InputPort>newArrayList(), spec.getOperator(), spec.getOutPort(), + messageFormat.getRowType(typeFactory)); + } + + @Override + public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory) + { + RelInfo spec = messageFormat.populateOutputDAG(dag, typeFactory); + + KafkaSinglePortOutputOperator kafkaOutput = dag.addOperator(OperatorUtils.getUniqueOperatorName("KafkaOutput"), + KafkaSinglePortOutputOperator.class); + kafkaOutput.setTopic((String)operands.get(KAFKA_TOPICS)); + + Properties props = new Properties(); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, operands.get(KAFKA_SERVERS)); + kafkaOutput.setProperties(props); + + dag.addStream(OperatorUtils.getUniqueStreamName("Formatter", "Kafka"), spec.getOutPort(), kafkaOutput.inputPort); + + return new RelInfo("Output", spec.getInputPorts(), spec.getOperator(), null, messageFormat.getRowType(typeFactory)); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) + { + return messageFormat.getRowType(typeFactory); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/MessageFormat.java ---------------------------------------------------------------------- diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/MessageFormat.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/MessageFormat.java new file mode 100644 index 0000000..80fef93 --- /dev/null +++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/MessageFormat.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.table; + +import java.util.Map; + +import org.apache.apex.malhar.sql.planner.RelInfo; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.DAG; + +/** + * This interface defines how message should be parsed from input or formatted for output. + * The implementation of this interface should define both parsing and formatting representation for data. + */ [email protected] +public interface MessageFormat +{ + String MESSAGE_FORMAT = "messageFormat"; + String MESSAGE_FORMAT_OPERANDS = "messageFormatOperands"; + + /** + * Gives type of {@link MessageFormat} + * @return Returns type of {@link MessageFormat} + */ + MessageFormatType getMessageFormatType(); + + /** + * Set messageFormat operands. This method is used when the table definitions are provided using calcite schema format. + * This is the map which is present against key "endpointOperands" in calcite schema definition input file. + * @param operands + */ + void setMessageFormatOperands(Map<String, Object> operands); + + /** + * Implementation of this method should populate the DAG for parsing logic for the data received from {@link Endpoint} + * + * @param dag {@link DAG} object to be populated + * @param typeFactory Java Type Factory + * @return Returns {@link RelInfo} defining output data type definition after parsing of data. + */ + RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory); + + /** + * Implementation of this method should populate the DAG for formatting logic of data to be written to {@link Endpoint} + * + * @param dag {@link DAG} object to be populated + * @param typeFactory Java Type Factory + * @return Returns {@link RelInfo} defining expected input for formatting ot data. + */ + RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory); + + /** + * This method returns what should be the input data type to output phase OR output data type of input phase. + * + * @param typeFactory Java Type Factory for data type conversions. + * + * @return {@link RelDataType} representing data type format. + */ + RelDataType getRowType(RelDataTypeFactory typeFactory); + + /** + * Message Format types + */ + enum MessageFormatType + { + CSV + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/main/java/org/apache/apex/malhar/sql/table/StreamEndpoint.java ---------------------------------------------------------------------- diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/table/StreamEndpoint.java b/sql/src/main/java/org/apache/apex/malhar/sql/table/StreamEndpoint.java new file mode 100644 index 0000000..5462e42 --- /dev/null +++ b/sql/src/main/java/org/apache/apex/malhar/sql/table/StreamEndpoint.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.table; + +import java.lang.reflect.Field; +import java.util.Date; +import java.util.Map; + +import org.apache.apex.malhar.sql.planner.RelInfo; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; + +/** + * This is an implementation of {@link Endpoint} which defined how data should be read/written to a Apex streaming port. + */ [email protected] +public class StreamEndpoint implements Endpoint +{ + private Operator.InputPort inputPort; + private Operator.OutputPort outputPort; + private Class pojoClass; + private Map<String, Class> fieldMapping; + + public StreamEndpoint(Operator.InputPort port, Class pojoClass) + { + this.inputPort = port; + this.pojoClass = pojoClass; + } + + public StreamEndpoint(Operator.InputPort port, Map<String, Class> fieldMapping) + { + this.inputPort = port; + this.fieldMapping = fieldMapping; + } + + public StreamEndpoint(Operator.OutputPort outputPort, Class pojoClass) + { + this.outputPort = outputPort; + this.pojoClass = pojoClass; + } + + public StreamEndpoint(Operator.OutputPort port, Map<String, Class> fieldMapping) + { + this.outputPort = port; + this.fieldMapping = fieldMapping; + } + + @Override + public EndpointType getTargetType() + { + return EndpointType.PORT; + } + + @Override + public void setEndpointOperands(Map<String, Object> operands) + { + } + + @Override + public void setMessageFormat(MessageFormat messageFormat) + { + } + + @Override + public RelInfo populateInputDAG(DAG dag, JavaTypeFactory typeFactory) + { + return new RelInfo("StreamInput", Lists.<Operator.InputPort>newArrayList(), null, outputPort, getRowType(typeFactory)); + } + + @Override + public RelInfo populateOutputDAG(DAG dag, JavaTypeFactory typeFactory) + { + return new RelInfo("StreamOutput", Lists.newArrayList(inputPort), null, null, getRowType(typeFactory)); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) + { + RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); + if (fieldMapping != null) { + for (Map.Entry<String, Class> entry : fieldMapping.entrySet()) { + builder.add(entry.getKey(), convertField(typeFactory, entry.getValue())); + } + } else if (pojoClass != null) { + for (Field field : pojoClass.getDeclaredFields()) { + builder.add(field.getName(), convertField(typeFactory, field.getType())); + } + } else { + throw new RuntimeException("Either fieldMapping or pojoClass needs to be set."); + } + + return builder.build(); + } + + private RelDataType convertField(RelDataTypeFactory typeFactory, Class<?> type) + { + RelDataType relDataType; + + if ((type == Boolean.class) || (type == boolean.class)) { + relDataType = typeFactory.createSqlType(SqlTypeName.BOOLEAN); + } else if ((type == Double.class) || (type == double.class)) { + relDataType = typeFactory.createSqlType(SqlTypeName.DOUBLE); + } else if ((type == Integer.class) || (type == int.class)) { + relDataType = typeFactory.createSqlType(SqlTypeName.INTEGER); + } else if ((type == Float.class) || (type == float.class)) { + relDataType = typeFactory.createSqlType(SqlTypeName.FLOAT); + } else if ((type == Long.class) || (type == long.class)) { + relDataType = typeFactory.createSqlType(SqlTypeName.BIGINT); + } else if ((type == Short.class) || (type == short.class)) { + relDataType = typeFactory.createSqlType(SqlTypeName.SMALLINT); + } else if ((type == Character.class) || (type == char.class) || (type == Byte.class) || (type == byte.class)) { + relDataType = typeFactory.createSqlType(SqlTypeName.CHAR); + } else if (type == String.class) { + relDataType = typeFactory.createSqlType(SqlTypeName.VARCHAR); + } else if (type == Date.class) { + relDataType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP); + } else { + relDataType = typeFactory.createSqlType(SqlTypeName.ANY); + } + return relDataType; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/FileEndpointTest.java ---------------------------------------------------------------------- diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/FileEndpointTest.java b/sql/src/test/java/org/apache/apex/malhar/sql/FileEndpointTest.java new file mode 100644 index 0000000..900fd10 --- /dev/null +++ b/sql/src/test/java/org/apache/apex/malhar/sql/FileEndpointTest.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.TimeZone; + +import javax.validation.ConstraintViolationException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import org.apache.apex.malhar.sql.table.CSVMessageFormat; +import org.apache.apex.malhar.sql.table.FileEndpoint; + +import org.apache.commons.io.FileUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Predicates; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; + +public class FileEndpointTest +{ + private TimeZone defaultTZ; + private static String outputFolder = "target/output/"; + + @Rule + public TestName testName = new TestName(); + + public static String apex_concat_str(String s1, String s2) + { + return s1 + s2; + } + + @Before + public void setUp() throws Exception + { + defaultTZ = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + + outputFolder += testName.getMethodName() + "/"; + } + + @After + public void tearDown() throws Exception + { + TimeZone.setDefault(defaultTZ); + } + + @Test + public void testApplication() throws Exception + { + File modelFile = new File("src/test/resources/model/model_file_csv.json"); + String model = FileUtils.readFileToString(modelFile); + + PrintStream originalSysout = System.out; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new Application(model), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + waitTillStdoutIsPopulated(baos, 30000); + + lc.shutdown(); + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } catch (Exception e) { + Assert.fail("Exception: " + e); + } + + System.setOut(originalSysout); + + String[] sout = baos.toString().split(System.lineSeparator()); + Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:")); + + String[] actualLines = filter.toArray(new String[filter.size()]); + Assert.assertEquals(6, actualLines.length); + Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1")); + Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2")); + Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3")); + Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4")); + Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5")); + Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6")); + } + + private boolean waitTillStdoutIsPopulated(ByteArrayOutputStream baos, int timeout) throws InterruptedException, + IOException + { + long now = System.currentTimeMillis(); + Collection<String> filter = Lists.newArrayList(); + while (System.currentTimeMillis() - now < timeout) { + baos.flush(); + String[] sout = baos.toString().split(System.lineSeparator()); + filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:")); + if (filter.size() != 0) { + break; + } + + Thread.sleep(500); + } + + return (filter.size() != 0); + } + + @Test + public void testApplicationSelectInsertWithAPI() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new ApplicationSelectInsertWithAPI(), conf); + + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + /** + * Wait time is 40 sec to ensure that checkpoint happens. AbstractFileOutputOperators flushes the stream + * in beforeCheckpoint call. + */ + Assert.assertTrue(waitTillFileIsPopulated(outputFolder, 40000)); + lc.shutdown(); + } catch (Exception e) { + Assert.fail("constraint violations: " + e); + } + + File file = new File(outputFolder); + File file1 = new File(outputFolder + file.list()[0]); + List<String> strings = FileUtils.readLines(file1); + + String[] actualLines = strings.toArray(new String[strings.size()]); + + String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4", "", + "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5", ""}; + Assert.assertTrue(Arrays.deepEquals(actualLines, expectedLines)); + } + + private boolean waitTillFileIsPopulated(String outputFolder, int timeout) throws IOException, InterruptedException + { + boolean result; + long now = System.currentTimeMillis(); + Path outDir = new Path("file://" + new File(outputFolder).getAbsolutePath()); + try (FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration())) { + List<String> strings = Lists.newArrayList(); + while (System.currentTimeMillis() - now < timeout) { + if (fs.exists(outDir)) { + File file = new File(outputFolder); + if (file.list().length > 0) { + File file1 = new File(outputFolder + file.list()[0]); + strings = FileUtils.readLines(file1); + if (strings.size() != 0) { + break; + } + } + } + + Thread.sleep(500); + } + + result = fs.exists(outDir) && (strings.size() != 0); + } + + return result; + } + + + public static class Application implements StreamingApplication + { + String model; + + public Application(String model) + { + this.model = model; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + SQLExecEnvironment.getEnvironment() + .withModel(model) + .executeSQL(dag, "SELECT STREAM ROWTIME, PRODUCT FROM ORDERS"); + } + } + + public static class ApplicationSelectInsertWithAPI implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"id\",\"type\":\"Integer\"}," + + "{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"units\",\"type\":\"Integer\"}]}"; + String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"Product\",\"type\":\"String\"}]}"; + + SQLExecEnvironment.getEnvironment() + .registerTable("ORDERS", new FileEndpoint("src/test/resources/input.csv", + new CSVMessageFormat(schemaIn))) + .registerTable("SALES", new FileEndpoint(outputFolder, "out.tmp", new CSVMessageFormat(schemaOut))) + .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str") + .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " + + "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " + + "PRODUCT LIKE 'paint%'"); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/InputPOJO.java ---------------------------------------------------------------------- diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/InputPOJO.java b/sql/src/test/java/org/apache/apex/malhar/sql/InputPOJO.java new file mode 100644 index 0000000..7162e31 --- /dev/null +++ b/sql/src/test/java/org/apache/apex/malhar/sql/InputPOJO.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql; + +import java.util.Date; + +public class InputPOJO +{ + private Date RowTime; + private int id; + private String Product; + private int units; + + public Date getRowTime() + { + return RowTime; + } + + public void setRowTime(Date rowTime) + { + RowTime = rowTime; + } + + public int getId() + { + return id; + } + + public void setId(int id) + { + this.id = id; + } + + public String getProduct() + { + return Product; + } + + public void setProduct(String product) + { + Product = product; + } + + public int getUnits() + { + return units; + } + + public void setUnits(int units) + { + this.units = units; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/KafkaEndpointTest.java ---------------------------------------------------------------------- diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/KafkaEndpointTest.java b/sql/src/test/java/org/apache/apex/malhar/sql/KafkaEndpointTest.java new file mode 100644 index 0000000..14eff70 --- /dev/null +++ b/sql/src/test/java/org/apache/apex/malhar/sql/KafkaEndpointTest.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.TimeZone; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.apex.malhar.kafka.EmbeddedKafka; +import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; +import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator; +import org.apache.apex.malhar.sql.table.CSVMessageFormat; +import org.apache.apex.malhar.sql.table.KafkaEndpoint; +import org.apache.apex.malhar.sql.table.StreamEndpoint; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.contrib.formatter.CsvFormatter; +import com.datatorrent.contrib.parser.CsvParser; + +public class KafkaEndpointTest +{ + private final String testTopicData0 = "dataTopic0"; + private final String testTopicData1 = "dataTopic1"; + private final String testTopicResult = "resultTopic"; + + private EmbeddedKafka kafka; + + private TimeZone defaultTZ; + + @Before + public void setup() throws IOException + { + defaultTZ = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + + kafka = new EmbeddedKafka(); + kafka.start(); + kafka.createTopic(testTopicData0); + kafka.createTopic(testTopicData1); + kafka.createTopic(testTopicResult); + } + + @After + public void tearDown() throws IOException + { + kafka.stop(); + + TimeZone.setDefault(defaultTZ); + } + + @Test + public void testApplicationSelectInsertWithAPI() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new KafkaApplication(kafka.getBroker(), testTopicData0, testTopicResult), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11", + "15/02/2016 10:16:00 +0000,2,paint2,12", + "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14", + "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16")); + + // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char. + String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4\r\n", + "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5\r\n"}; + + List<String> consume = kafka.consume(testTopicResult, 30000); + Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines)); + + lc.shutdown(); + } catch (Exception e) { + Assert.fail("constraint violations: " + e); + } + } + + @Test + public void testApplicationWithPortEndpoint() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new KafkaPortApplication(kafka.getBroker(), testTopicData0, testTopicResult), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11", + "15/02/2016 10:16:00 +0000,2,paint2,12", + "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14", + "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16")); + + // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char. + String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4\r\n", + "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5\r\n"}; + + List<String> consume = kafka.consume(testTopicResult, 30000); + + Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines)); + + lc.shutdown(); + } catch (Exception e) { + Assert.fail("constraint violations: " + e); + } + } + + @Test + public void testApplicationJoin() throws Exception + { + String sql = "INSERT INTO SALES " + + "SELECT STREAM A.ROWTIME, FLOOR(A.ROWTIME TO DAY), " + + "APEXCONCAT('OILPAINT', SUBSTRING(A.PRODUCT, 6, 7)), B.CATEGORY " + + "FROM ORDERS AS A " + + "JOIN CATEGORY AS B ON A.id = B.id " + + "WHERE A.id > 3 AND A.PRODUCT LIKE 'paint%'"; + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new KafkaJoinApplication(kafka.getBroker(), testTopicData0, testTopicData1, + testTopicResult, sql), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11", + "15/02/2016 10:16:00 +0000,2,paint2,12", + "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14", + "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16")); + + kafka.publish(testTopicData1, Arrays.asList("1,ABC", + "2,DEF", + "3,GHI", "4,JKL", + "5,MNO", "6,PQR")); + + // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char. + String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4,JKL\r\n", + "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5,MNO\r\n"}; + + List<String> consume = kafka.consume(testTopicResult, 30000); + + Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines)); + + lc.shutdown(); + } catch (Exception e) { + Assert.fail("constraint violations: " + e); + } + } + + @Test + public void testApplicationJoinFilter() throws Exception + { + String sql = "INSERT INTO SALES SELECT STREAM A.ROWTIME, FLOOR(A.ROWTIME TO DAY), " + + "APEXCONCAT('OILPAINT', SUBSTRING(A.PRODUCT, 6, 7)), B.CATEGORY " + + "FROM ORDERS AS A JOIN CATEGORY AS B ON A.id = B.id AND A.id > 3" + + "WHERE A.PRODUCT LIKE 'paint%'"; + + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new KafkaJoinApplication(kafka.getBroker(), testTopicData0, testTopicData1, + testTopicResult, sql), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11", + "15/02/2016 10:16:00 +0000,2,paint2,12", + "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14", + "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16")); + + kafka.publish(testTopicData1, Arrays.asList("1,ABC", + "2,DEF", + "3,GHI", "4,JKL", + "5,MNO", "6,PQR")); + + // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char. + String[] expectedLines = new String[]{"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4,JKL\r\n", + "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5,MNO\r\n"}; + + List<String> consume = kafka.consume(testTopicResult, 30000); + + Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines)); + + lc.shutdown(); + } catch (Exception e) { + Assert.fail("constraint violations: " + e); + } + } + + public static class KafkaApplication implements StreamingApplication + { + private String broker; + private String sourceTopic; + private String destTopic; + + public KafkaApplication(String broker, String sourceTopic, String destTopic) + { + this.broker = broker; + this.sourceTopic = sourceTopic; + this.destTopic = destTopic; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"id\",\"type\":\"Integer\"}," + + "{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"units\",\"type\":\"Integer\"}]}"; + String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"Product\",\"type\":\"String\"}]}"; + + SQLExecEnvironment.getEnvironment() + .registerTable("ORDERS", new KafkaEndpoint(broker, sourceTopic, new CSVMessageFormat(schemaIn))) + .registerTable("SALES", new KafkaEndpoint(broker, destTopic, new CSVMessageFormat(schemaOut))) + .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str") + .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " + + "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " + + "PRODUCT LIKE 'paint%'"); + } + } + + public static class KafkaJoinApplication implements StreamingApplication + { + private String broker; + private String sourceTopic0; + private String sourceTopic1; + private String destTopic; + private String sql; + + public KafkaJoinApplication(String broker, String sourceTopic0, String sourceTopic1, String destTopic, String sql) + { + this.broker = broker; + this.sourceTopic0 = sourceTopic0; + this.sourceTopic1 = sourceTopic1; + this.destTopic = destTopic; + this.sql = sql; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + String schemaIn0 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"id\",\"type\":\"Integer\"}," + + "{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"units\",\"type\":\"Integer\"}]}"; + String schemaIn1 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"id\",\"type\":\"Integer\"}," + + "{\"name\":\"Category\",\"type\":\"String\"}]}"; + String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"Category\",\"type\":\"String\"}]}"; + + SQLExecEnvironment.getEnvironment() + .registerTable("ORDERS", new KafkaEndpoint(broker, sourceTopic0, new CSVMessageFormat(schemaIn0))) + .registerTable("CATEGORY", new KafkaEndpoint(broker, sourceTopic1, new CSVMessageFormat(schemaIn1))) + .registerTable("SALES", new KafkaEndpoint(broker, destTopic, new CSVMessageFormat(schemaOut))) + .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str") + .executeSQL(dag, sql); + } + } + + public static class KafkaPortApplication implements StreamingApplication + { + private String broker; + private String sourceTopic; + private String destTopic; + + public KafkaPortApplication(String broker, String sourceTopic, String destTopic) + { + this.broker = broker; + this.sourceTopic = sourceTopic; + this.destTopic = destTopic; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"id\",\"type\":\"Integer\"}," + + "{\"name\":\"Product\",\"type\":\"String\"}," + + "{\"name\":\"units\",\"type\":\"Integer\"}]}"; + String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + + "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + + "{\"name\":\"Product\",\"type\":\"String\"}]}"; + + KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class); + kafkaInput.setTopics(sourceTopic); + kafkaInput.setInitialOffset("EARLIEST"); + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_DESERIALIZER); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_DESERIALIZER); + kafkaInput.setConsumerProps(props); + kafkaInput.setClusters(broker); + + CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class); + csvParser.setSchema(schemaIn); + + dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in); + + CsvFormatter formatter = dag.addOperator("CSVFormatter", CsvFormatter.class); + formatter.setSchema(schemaOut); + + KafkaSinglePortOutputOperator kafkaOutput = dag.addOperator("KafkaOutput", KafkaSinglePortOutputOperator.class); + kafkaOutput.setTopic(destTopic); + + props = new Properties(); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_SERIALIZER); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_SERIALIZER); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); + kafkaOutput.setProperties(props); + + dag.addStream("CSVToKafka", formatter.out, kafkaOutput.inputPort); + + SQLExecEnvironment.getEnvironment() + .registerTable("ORDERS", new StreamEndpoint(csvParser.out, InputPOJO.class)) + .registerTable("SALES", new StreamEndpoint(formatter.in, OutputPOJO.class)) + .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str") + .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " + + "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " + + "PRODUCT LIKE 'paint%'"); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/OutputPOJO.java ---------------------------------------------------------------------- diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/OutputPOJO.java b/sql/src/test/java/org/apache/apex/malhar/sql/OutputPOJO.java new file mode 100644 index 0000000..fdf78d7 --- /dev/null +++ b/sql/src/test/java/org/apache/apex/malhar/sql/OutputPOJO.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql; + + +import java.util.Date; + +public class OutputPOJO +{ + private Date RowTime1; + private Date RowTime2; + private String Product; + + public Date getRowTime1() + { + return RowTime1; + } + + public void setRowTime1(Date rowTime1) + { + RowTime1 = rowTime1; + } + + public Date getRowTime2() + { + return RowTime2; + } + + public void setRowTime2(Date rowTime2) + { + RowTime2 = rowTime2; + } + + public String getProduct() + { + return Product; + } + + public void setProduct(String product) + { + Product = product; + } +}
