NIFI-1280 Create FilterCSVColumns Processor. Signed-off-by: Matt Burgess <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4d5872a3 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4d5872a3 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4d5872a3 Branch: refs/heads/master Commit: 4d5872a38500ed0541d7689107b34daedb5b7e34 Parents: 52cf9a7 Author: Toivo Adams <[email protected]> Authored: Sat May 7 12:29:15 2016 +0300 Committer: Matt Burgess <[email protected]> Committed: Tue Apr 11 19:29:04 2017 -0400 ---------------------------------------------------------------------- .../nifi-standard-processors/pom.xml | 5 + .../calcite/adapter/csv/CsvEnumerator2.java | 303 +++++++++++++++++++ .../apache/calcite/adapter/csv/CsvSchema2.java | 98 ++++++ .../calcite/adapter/csv/CsvSchemaFactory2.java | 53 ++++ .../calcite/adapter/csv/CsvTableScan2.java | 104 +++++++ .../adapter/csv/CsvTranslatableTable2.java | 121 ++++++++ .../processors/standard/FilterCSVColumns.java | 258 ++++++++++++++++ .../standard/TestFilterCSVColumns.java | 117 +++++++ .../resources/TestFilterCSVColumns/Numeric.csv | 5 + .../resources/TestFilterCSVColumns/US500.csv | 1 + .../TestFilterCSVColumns/US500_typeless.csv | 1 + 11 files changed, 1066 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index dc77309..d410f43 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -283,6 +283,11 @@ language governing permissions and limitations under the License. --> </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-example-csv</artifactId> + <version>1.11.0</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java new file mode 100644 index 0000000..0f928ce --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.csv; + +import java.io.IOException; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.TimeZone; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.util.Pair; +import org.apache.commons.lang3.time.FastDateFormat; + +import au.com.bytecode.opencsv.CSVReader; + + +/** Enumerator that reads from a CSV stream. + * + * @param <E> Row type + */ +class CsvEnumerator2<E> implements Enumerator<E> { + private final CSVReader reader; + private final String[] filterValues; + private final RowConverter<E> rowConverter; + private E current; + + private static final FastDateFormat TIME_FORMAT_DATE; + private static final FastDateFormat TIME_FORMAT_TIME; + private static final FastDateFormat TIME_FORMAT_TIMESTAMP; + + static { + TimeZone gmt = TimeZone.getTimeZone("GMT"); + TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt); + TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt); + TIME_FORMAT_TIMESTAMP = + FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt); + } + + public CsvEnumerator2(CSVReader csvReader, List<CsvFieldType> fieldTypes) { + this(verifyNotNullReader(csvReader), fieldTypes, identityList(fieldTypes.size())); + } + + public CsvEnumerator2(CSVReader csvReader, List<CsvFieldType> fieldTypes, int[] fields) { + //noinspection unchecked + this(csvReader, null, (RowConverter<E>) converter(fieldTypes, fields)); + } + + public CsvEnumerator2(CSVReader csvReader, String[] filterValues, RowConverter<E> rowConverter) { + this.rowConverter = rowConverter; + this.filterValues = filterValues; + this.reader = csvReader; + } + + static public CSVReader verifyNotNullReader(CSVReader csvReader) { + if (csvReader==null) + throw new IllegalArgumentException("csvReader cannot be null"); + return csvReader; + } + + private static RowConverter<?> converter(List<CsvFieldType> fieldTypes, + int[] fields) { + if (fields.length == 1) { + final int field = fields[0]; + return new SingleColumnRowConverter(fieldTypes.get(field), field); + } else { + return new ArrayRowConverter(fieldTypes, fields); + } + } + + /** Deduces the names and types of a table's columns by reading the first line + * of a CSV stream. */ + static public RelDataType deduceRowType(JavaTypeFactory typeFactory, String[] firstLine, + List<CsvFieldType> fieldTypes) { + final List<RelDataType> types = new ArrayList<>(); + final List<String> names = new ArrayList<>(); + for (String string : firstLine) { + final String name; + final CsvFieldType fieldType; + final int colon = string.indexOf(':'); + if (colon >= 0) { + name = string.substring(0, colon); + String typeString = string.substring(colon + 1); + typeString = typeString.trim(); + fieldType = CsvFieldType.of(typeString); + if (fieldType == null) { + System.out.println("WARNING: Found unknown type: " + + typeString + " in first line: " + + " for column: " + name + + ". Will assume the type of column is string"); + } + } else { + name = string; + fieldType = null; + } + final RelDataType type; + if (fieldType == null) { + type = typeFactory.createJavaType(String.class); + } else { + type = fieldType.toType(typeFactory); + } + names.add(name); + types.add(type); + if (fieldTypes != null) { + fieldTypes.add(fieldType); + } + } + + if (names.isEmpty()) { + names.add("line"); + types.add(typeFactory.createJavaType(String.class)); + } + return typeFactory.createStructType(Pair.zip(names, types)); + } + + public E current() { + return current; + } + + public boolean moveNext() { + try { + outer: + for (;;) { + final String[] strings = reader.readNext(); + if (strings == null) { + current = null; + reader.close(); + return false; + } + if (filterValues != null) { + for (int i = 0; i < strings.length; i++) { + String filterValue = filterValues[i]; + if (filterValue != null) { + if (!filterValue.equals(strings[i])) { + continue outer; + } + } + } + } + current = rowConverter.convertRow(strings); + return true; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void reset() { + throw new UnsupportedOperationException(); + } + + public void close() { + try { + reader.close(); + } catch (IOException e) { + throw new RuntimeException("Error closing CSV reader", e); + } + } + + /** Returns an array of integers {0, ..., n - 1}. */ + static int[] identityList(int n) { + int[] integers = new int[n]; + for (int i = 0; i < n; i++) { + integers[i] = i; + } + return integers; + } + + /** Row converter. */ + abstract static class RowConverter<E> { + abstract E convertRow(String[] rows); + + protected Object convert(CsvFieldType fieldType, String string) { + if (fieldType == null) { + return string; + } + switch (fieldType) { + case BOOLEAN: + if (string.length() == 0) { + return null; + } + return Boolean.parseBoolean(string); + case BYTE: + if (string.length() == 0) { + return null; + } + return Byte.parseByte(string); + case SHORT: + if (string.length() == 0) { + return null; + } + return Short.parseShort(string); + case INT: + if (string.length() == 0) { + return null; + } + return Integer.parseInt(string); + case LONG: + if (string.length() == 0) { + return null; + } + return Long.parseLong(string); + case FLOAT: + if (string.length() == 0) { + return null; + } + return Float.parseFloat(string); + case DOUBLE: + if (string.length() == 0) { + return null; + } + return Double.parseDouble(string); + case DATE: + if (string.length() == 0) { + return null; + } + try { + Date date = TIME_FORMAT_DATE.parse(string); + return new java.sql.Date(date.getTime()); + } catch (ParseException e) { + return null; + } + case TIME: + if (string.length() == 0) { + return null; + } + try { + Date date = TIME_FORMAT_TIME.parse(string); + return new java.sql.Time(date.getTime()); + } catch (ParseException e) { + return null; + } + case TIMESTAMP: + if (string.length() == 0) { + return null; + } + try { + Date date = TIME_FORMAT_TIMESTAMP.parse(string); + return new java.sql.Timestamp(date.getTime()); + } catch (ParseException e) { + return null; + } + case STRING: + default: + return string; + } + } + } + + /** Array row converter. */ + static class ArrayRowConverter extends RowConverter<Object[]> { + private final CsvFieldType[] fieldTypes; + private final int[] fields; + + ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields) { + this.fieldTypes = fieldTypes.toArray(new CsvFieldType[fieldTypes.size()]); + this.fields = fields; + } + + public Object[] convertRow(String[] strings) { + final Object[] objects = new Object[fields.length]; + for (int i = 0; i < fields.length; i++) { + int field = fields[i]; + objects[i] = convert(fieldTypes[field], strings[field]); + } + return objects; + } + } + + /** Single column row converter. */ + private static class SingleColumnRowConverter extends RowConverter { + private final CsvFieldType fieldType; + private final int fieldIndex; + + private SingleColumnRowConverter(CsvFieldType fieldType, int fieldIndex) { + this.fieldType = fieldType; + this.fieldIndex = fieldIndex; + } + + public Object convertRow(String[] strings) { + return convert(fieldType, strings[fieldIndex]); + } + } +} + +// End CsvEnumerator2.java http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java new file mode 100644 index 0000000..f724f79 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.csv; + +import java.io.Reader; +import java.util.Map; + +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; + +import com.google.common.collect.ImmutableMap; + +/** + * Schema mapped onto a directory of CSV files. Each table in the schema + * is a CSV file in that directory. + */ +public class CsvSchema2 extends AbstractSchema { + final private Map<String, Reader> inputs; + private final CsvTable.Flavor flavor; + private Map<String, Table> tableMap; + + /** + * Creates a CSV schema. + * + * @param inputs Inputs map + * @param flavor Whether to instantiate flavor tables that undergo + * query optimization + */ + public CsvSchema2(Map<String, Reader> inputs, CsvTable.Flavor flavor) { + super(); + this.inputs = inputs; + this.flavor = flavor; + } + + /** Looks for a suffix on a string and returns + * either the string with the suffix removed + * or the original string. */ + private static String trim(String s, String suffix) { + String trimmed = trimOrNull(s, suffix); + return trimmed != null ? trimmed : s; + } + + /** Looks for a suffix on a string and returns + * either the string with the suffix removed + * or null. */ + private static String trimOrNull(String s, String suffix) { + return s.endsWith(suffix) + ? s.substring(0, s.length() - suffix.length()) + : null; + } + + @Override protected Map<String, Table> getTableMap() { + + if (tableMap!=null) + return tableMap; + + // Build a map from table name to table; each file becomes a table. + final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); + + for (Map.Entry<String, Reader> entry : inputs.entrySet()) { + final Table table = createTable(entry.getValue()); + builder.put(entry.getKey(), table); + } + + tableMap = builder.build(); + return tableMap; + } + + /** Creates different sub-type of table based on the "flavor" attribute. */ + private Table createTable(Reader readerx) { + switch (flavor) { + case TRANSLATABLE: + return new CsvTranslatableTable2(readerx, null); +// case SCANNABLE: +// return new CsvScannableTable(file, null); +// case FILTERABLE: +// return new CsvFilterableTable(file, null); + default: + throw new AssertionError("Unknown flavor " + flavor); + } + } +} + +// End CsvSchema2.java http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java new file mode 100644 index 0000000..f8ec576 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.csv; + +import java.io.Reader; +import java.util.Map; + +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaFactory; +import org.apache.calcite.schema.SchemaPlus; + +/** + * Factory that creates a {@link CsvSchema}. + * + * <p>Allows a custom schema to be included in a <code><i>model</i>.json</code> + * file.</p> + */ +@SuppressWarnings("UnusedDeclaration") +public class CsvSchemaFactory2 implements SchemaFactory { + final private Map<String, Reader> inputs; + // public constructor, per factory contract + public CsvSchemaFactory2(Map<String, Reader> inputs) { + this.inputs = inputs; + } + + public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { + String flavorName = (String) operand.get("flavor"); + CsvTable.Flavor flavor; + if (flavorName == null) { + flavor = CsvTable.Flavor.SCANNABLE; + } else { + flavor = CsvTable.Flavor.valueOf(flavorName.toUpperCase()); + } + + return new CsvSchema2(inputs, flavor); + } +} + +// End CsvSchemaFactory2.java http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java new file mode 100644 index 0000000..75f013c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.csv; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.tree.Blocks; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; + +import java.util.List; + +/** + * Relational expression representing a scan of a CSV stream. + * + * <p>Like any table scan, it serves as a leaf node of a query tree.</p> + */ +public class CsvTableScan2 extends TableScan implements EnumerableRel { + final CsvTranslatableTable2 csvTable; + final int[] fields; + + protected CsvTableScan2(RelOptCluster cluster, RelOptTable table, + CsvTranslatableTable2 csvTable, int[] fields) { + super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table); + this.csvTable = csvTable; + this.fields = fields; + + assert csvTable != null; + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert inputs.isEmpty(); + return new CsvTableScan2(getCluster(), table, csvTable, fields); + } + + @Override public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .item("fields", Primitive.asList(fields)); + } + + @Override public RelDataType deriveRowType() { + final List<RelDataTypeField> fieldList = table.getRowType().getFieldList(); + final RelDataTypeFactory.FieldInfoBuilder builder = + getCluster().getTypeFactory().builder(); + for (int field : fields) { + builder.add(fieldList.get(field)); + } + return builder.build(); + } + + @Override public void register(RelOptPlanner planner) { + planner.addRule(CsvProjectTableScanRule.INSTANCE); + } + + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + PhysType physType = + PhysTypeImpl.of( + implementor.getTypeFactory(), + getRowType(), + pref.preferArray()); + + if (table instanceof JsonTable) { + return implementor.result( + physType, + Blocks.toBlock( + Expressions.call(table.getExpression(JsonTable.class), + "enumerable"))); + } + return implementor.result( + physType, + Blocks.toBlock( + Expressions.call(table.getExpression(CsvTranslatableTable2.class), + "project", Expressions.constant(fields)))); + } +} + +// End CsvTableScan.java http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java new file mode 100644 index 0000000..bc28fdd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.csv; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.QueryableTable; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.TranslatableTable; + +import au.com.bytecode.opencsv.CSVReader; + +import java.io.IOException; +import java.io.Reader; +import java.lang.reflect.Type; +import java.util.ArrayList; + +/** + * Table based on a CSV stream. + */ +public class CsvTranslatableTable2 extends CsvTable + implements QueryableTable, TranslatableTable { + + final private CSVReader csvReader; + private CsvEnumerator2<Object> csvEnumerator2; + final private String[] firstLine; + + /** Creates a CsvTable. + */ + CsvTranslatableTable2(Reader readerx, RelProtoDataType protoRowType) { + super(null, protoRowType); + this.csvReader = new CSVReader(readerx); + try { + this.firstLine = csvReader.readNext(); + } catch (IOException e) { + throw new RuntimeException("csvReader.readNext() failed ", e); + } + } + + public String toString() { + return "CsvTranslatableTable2"; + } + + /** Returns an enumerable over a given projection of the fields. + * + * <p>Called from generated code. */ + public Enumerable<Object> project(final int[] fields) { + return new AbstractEnumerable<Object>() { + public Enumerator<Object> enumerator() { + return csvEnumerator2; + } + }; + } + + public Expression getExpression(SchemaPlus schema, String tableName, + Class clazz) { + return Schemas.tableExpression(schema, getElementType(), tableName, clazz); + } + + public Type getElementType() { + return Object[].class; + } + + public <T> Queryable<T> asQueryable(QueryProvider queryProvider, + SchemaPlus schema, String tableName) { + throw new UnsupportedOperationException(); + } + + public RelNode toRel( + RelOptTable.ToRelContext context, + RelOptTable relOptTable) { + // Request all fields. + final int fieldCount = relOptTable.getRowType().getFieldCount(); + final int[] fields = CsvEnumerator.identityList(fieldCount); + return new CsvTableScan2(context.getCluster(), relOptTable, this, fields); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + RelDataType rowType = null; + + if (fieldTypes == null) { + fieldTypes = new ArrayList<CsvFieldType>(); + rowType = CsvEnumerator2.deduceRowType((JavaTypeFactory) typeFactory, firstLine, fieldTypes); + } else { + rowType = CsvEnumerator2.deduceRowType((JavaTypeFactory) typeFactory, firstLine, null); + } + + if (csvEnumerator2==null) + csvEnumerator2 = new CsvEnumerator2<Object>(csvReader, fieldTypes); + + return rowType; + } +} + +// End CsvTranslatableTable2.java http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java new file mode 100644 index 0000000..718f462 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static java.sql.Types.CHAR; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.VARCHAR; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.calcite.adapter.csv.CsvSchemaFactory2; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; + +import com.google.common.collect.ImmutableMap; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"xml", "xslt", "transform"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Filter out specific columns from CSV data. Some other transformations are also supported." + + "Columns can be renamed, simple calculations performed, aggregations, etc." + + "SQL select statement is used to specify how CSV data should be transformed." + + "SQL statement follows standard SQL, some restrictions may apply." + + "Successfully transformed CSV data is routed to the 'success' relationship." + + "If transform fails, the original FlowFile is routed to the 'failure' relationship") +public class FilterCSVColumns extends AbstractProcessor { + + public static final PropertyDescriptor SQL_SELECT = new PropertyDescriptor.Builder() + .name("SQL select statement") + .description("SQL select statement specifies how CSV data should be transformed. " + + "Sql select should select from CSV.A table") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the SQL statement contains columns not present in CSV), it will be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SQL_SELECT); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + try { + FlowFile transformed = session.write(original, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream out) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + + String sql = context.getProperty(SQL_SELECT).getValue(); + final ResultSet resultSet = transform(rawIn, sql); + convertToCSV(resultSet, out); + + } catch (final Exception e) { + throw new IOException(e); + } + } + }); + session.transfer(transformed, REL_SUCCESS); + session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + logger.info("Transformed {}", new Object[]{original}); + } catch (ProcessException e) { + logger.error("Unable to transform {} due to {}", new Object[]{original, e}); + session.transfer(original, REL_FAILURE); + } + } + + static protected ResultSet transform(InputStream rawIn, String sql) throws SQLException { + + Reader readerx = new InputStreamReader(rawIn); + HashMap<String, Reader> inputs = new HashMap<>(); + inputs.put("A", readerx); + + Statement statement = null; + final Properties properties = new Properties(); +// properties.setProperty("caseSensitive", "true"); + try (final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties)) { + final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + + final SchemaPlus rootSchema = calciteConnection.getRootSchema(); + final Schema schema = + new CsvSchemaFactory2(inputs) + .create(rootSchema, "CSV", ImmutableMap.<String, Object>of("flavor", "TRANSLATABLE")); + + calciteConnection.getRootSchema().add("CSV", schema); + rootSchema.add("default", schema); + + statement = connection.createStatement(); + final ResultSet resultSet = statement.executeQuery(sql); + return resultSet; + } + } + + static protected void convertToCSV(ResultSet resultSet, OutputStream out) throws SQLException, IOException { + + convertToCsvStream(resultSet, out); + } + + public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException { + return convertToCsvStream(rs, outStream, null, null); + } + + public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) + throws SQLException, IOException { + + final ResultSetMetaData meta = rs.getMetaData(); + final int nrOfColumns = meta.getColumnCount(); + List<String> columnNames = new ArrayList<>(nrOfColumns); + + for (int i = 1; i <= nrOfColumns; i++) { + String columnNameFromMeta = meta.getColumnName(i); + // Hive returns table.column for column name. Grab the column name as the string after the last period + int columnNameDelimiter = columnNameFromMeta.lastIndexOf("."); + columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 1)); + } + + // Write column names as header row + outStream.write(StringUtils.join(columnNames, ",").getBytes(StandardCharsets.UTF_8)); + outStream.write("\n".getBytes(StandardCharsets.UTF_8)); + + // Iterate over the rows + long nrOfRows = 0; + while (rs.next()) { + if (callback != null) { + callback.processRow(rs); + } + List<String> rowValues = new ArrayList<>(nrOfColumns); + for (int i = 1; i <= nrOfColumns; i++) { + final int javaSqlType = meta.getColumnType(i); + final Object value = rs.getObject(i); + + switch (javaSqlType) { + case CHAR: + case LONGNVARCHAR: + case LONGVARCHAR: + case NCHAR: + case NVARCHAR: + case VARCHAR: + rowValues.add("\"" + StringEscapeUtils.escapeCsv(rs.getString(i)) + "\""); + break; + default: + rowValues.add(value.toString()); + } + } + // Write row values + outStream.write(StringUtils.join(rowValues, ",").getBytes(StandardCharsets.UTF_8)); + outStream.write("\n".getBytes(StandardCharsets.UTF_8)); + nrOfRows++; + } + return nrOfRows; + } + + /** + * An interface for callback methods which allows processing of a row during the convertToXYZStream() processing. + * <b>IMPORTANT:</b> This method should only work on the row pointed at by the current ResultSet reference. + * Advancing the cursor (e.g.) can cause rows to be skipped during Avro transformation. + */ + public interface ResultSetRowCallback { + void processRow(ResultSet resultSet) throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java new file mode 100644 index 0000000..421da98 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestFilterCSVColumns { + + private static final Logger LOGGER; + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.FilterCSVColumns", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestFilterCSVColumns", "debug"); + LOGGER = LoggerFactory.getLogger(TestFilterCSVColumns.class); + } + + @Test + public void testTransformSimple() throws InitializationException, IOException, SQLException { + String sql = "select first_name, last_name, company_name, address, city from CSV.A where city='New York'"; + + Path inpath = Paths.get("src/test/resources/TestFilterCSVColumns/US500.csv"); + InputStream in = new FileInputStream(inpath.toFile()); + + ResultSet resultSet = FilterCSVColumns.transform(in, sql); + + int nrofColumns = resultSet.getMetaData().getColumnCount(); + + for (int i = 1; i <= nrofColumns; i++) { + System.out.print(resultSet.getMetaData().getColumnLabel(i) + " "); + } + System.out.println(); + + while (resultSet.next()) { + for (int i = 1; i <= nrofColumns; i++) { + System.out.print(resultSet.getString(i)+ " "); + } + System.out.println(); + } + } + + @Test + public void testTransformCalc() throws InitializationException, IOException, SQLException { + String sql = "select ID, AMOUNT1+AMOUNT2+AMOUNT3 as TOTAL from CSV.A where ID=100"; + + Path inpath = Paths.get("src/test/resources/TestFilterCSVColumns/Numeric.csv"); + InputStream in = new FileInputStream(inpath.toFile()); + + ResultSet resultSet = FilterCSVColumns.transform(in, sql); + + int nrofColumns = resultSet.getMetaData().getColumnCount(); + + for (int i = 1; i <= nrofColumns; i++) { + System.out.print(resultSet.getMetaData().getColumnLabel(i) + " "); + } + System.out.println(); + + while (resultSet.next()) { + for (int i = 1; i <= nrofColumns; i++) { + System.out.print(resultSet.getString(i)+ " "); + } + double total = resultSet.getDouble("TOTAL"); + System.out.println(); + assertEquals(90.75, total, 0.0001); + } + } + + @Test + public void testSimpleTypeless() throws InitializationException, IOException { + final TestRunner runner = TestRunners.newTestRunner(FilterCSVColumns.class); + String sql = "select first_name, last_name, company_name, address, city from CSV.A where city='New York'"; + runner.setProperty(FilterCSVColumns.SQL_SELECT, sql); + + runner.enqueue(Paths.get("src/test/resources/TestFilterCSVColumns/US500_typeless.csv")); + runner.run(); + + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS); + for (final MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile); + System.out.println(new String(flowFile.toByteArray())); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv new file mode 100644 index 0000000..2d56bb7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv @@ -0,0 +1,5 @@ +ID:int,AMOUNT1: float,AMOUNT2:float,AMOUNT3:float +008, 10.05, 15.45, 89.99 +100, 20.25, 25.25, 45.25 +105, 20.05, 25.05, 45.05 +200, 34.05, 25.05, 75.05 \ No newline at end of file
