NIFI-1280 Create FilterCSVColumns Processor.

Signed-off-by: Matt Burgess <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4d5872a3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4d5872a3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4d5872a3

Branch: refs/heads/master
Commit: 4d5872a38500ed0541d7689107b34daedb5b7e34
Parents: 52cf9a7
Author: Toivo Adams <[email protected]>
Authored: Sat May 7 12:29:15 2016 +0300
Committer: Matt Burgess <[email protected]>
Committed: Tue Apr 11 19:29:04 2017 -0400

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   5 +
 .../calcite/adapter/csv/CsvEnumerator2.java     | 303 +++++++++++++++++++
 .../apache/calcite/adapter/csv/CsvSchema2.java  |  98 ++++++
 .../calcite/adapter/csv/CsvSchemaFactory2.java  |  53 ++++
 .../calcite/adapter/csv/CsvTableScan2.java      | 104 +++++++
 .../adapter/csv/CsvTranslatableTable2.java      | 121 ++++++++
 .../processors/standard/FilterCSVColumns.java   | 258 ++++++++++++++++
 .../standard/TestFilterCSVColumns.java          | 117 +++++++
 .../resources/TestFilterCSVColumns/Numeric.csv  |   5 +
 .../resources/TestFilterCSVColumns/US500.csv    |   1 +
 .../TestFilterCSVColumns/US500_typeless.csv     |   1 +
 11 files changed, 1066 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index dc77309..d410f43 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -283,6 +283,11 @@ language governing permissions and limitations under the 
License. -->
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-example-csv</artifactId>
+            <version>1.11.0</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java
new file mode 100644
index 0000000..0f928ce
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.csv;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.Pair;
+import org.apache.commons.lang3.time.FastDateFormat;
+
+import au.com.bytecode.opencsv.CSVReader;
+
+
+/** Enumerator that reads from a CSV stream.
+ *
+ * @param <E> Row type
+ */
+class CsvEnumerator2<E> implements Enumerator<E> {
+  private final CSVReader reader;
+  private final String[] filterValues;
+  private final RowConverter<E> rowConverter;
+  private E current;
+
+  private static final FastDateFormat TIME_FORMAT_DATE;
+  private static final FastDateFormat TIME_FORMAT_TIME;
+  private static final FastDateFormat TIME_FORMAT_TIMESTAMP;
+
+  static {
+    TimeZone gmt = TimeZone.getTimeZone("GMT");
+    TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt);
+    TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
+    TIME_FORMAT_TIMESTAMP =
+        FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt);
+  }
+
+  public CsvEnumerator2(CSVReader csvReader, List<CsvFieldType> fieldTypes) {
+    this(verifyNotNullReader(csvReader), fieldTypes, 
identityList(fieldTypes.size()));
+  }
+
+  public CsvEnumerator2(CSVReader csvReader, List<CsvFieldType> fieldTypes, 
int[] fields) {
+    //noinspection unchecked
+    this(csvReader, null, (RowConverter<E>) converter(fieldTypes, fields));
+  }
+
+  public CsvEnumerator2(CSVReader csvReader, String[] filterValues, 
RowConverter<E> rowConverter) {
+    this.rowConverter = rowConverter;
+    this.filterValues = filterValues;
+    this.reader = csvReader;
+  }
+
+  static public CSVReader verifyNotNullReader(CSVReader csvReader) {
+    if (csvReader==null)
+      throw new IllegalArgumentException("csvReader cannot be null");
+    return csvReader;
+  }
+
+  private static RowConverter<?> converter(List<CsvFieldType> fieldTypes,
+      int[] fields) {
+    if (fields.length == 1) {
+      final int field = fields[0];
+      return new SingleColumnRowConverter(fieldTypes.get(field), field);
+    } else {
+      return new ArrayRowConverter(fieldTypes, fields);
+    }
+  }
+
+  /** Deduces the names and types of a table's columns by reading the first 
line
+   * of a CSV stream. */
+  static public RelDataType deduceRowType(JavaTypeFactory typeFactory, 
String[] firstLine,
+      List<CsvFieldType> fieldTypes) {
+    final List<RelDataType> types = new ArrayList<>();
+    final List<String> names = new ArrayList<>();
+      for (String string : firstLine) {
+        final String name;
+        final CsvFieldType fieldType;
+        final int colon = string.indexOf(':');
+        if (colon >= 0) {
+          name = string.substring(0, colon);
+          String typeString = string.substring(colon + 1);
+          typeString = typeString.trim();
+          fieldType = CsvFieldType.of(typeString);
+          if (fieldType == null) {
+            System.out.println("WARNING: Found unknown type: "
+              + typeString + " in first line: "
+              + " for column: " + name
+              + ". Will assume the type of column is string");
+          }
+        } else {
+          name = string;
+          fieldType = null;
+        }
+        final RelDataType type;
+        if (fieldType == null) {
+          type = typeFactory.createJavaType(String.class);
+        } else {
+          type = fieldType.toType(typeFactory);
+        }
+        names.add(name);
+        types.add(type);
+        if (fieldTypes != null) {
+          fieldTypes.add(fieldType);
+        }
+      }
+
+    if (names.isEmpty()) {
+      names.add("line");
+      types.add(typeFactory.createJavaType(String.class));
+    }
+    return typeFactory.createStructType(Pair.zip(names, types));
+  }
+
+  public E current() {
+    return current;
+  }
+
+  public boolean moveNext() {
+    try {
+    outer:
+      for (;;) {
+        final String[] strings = reader.readNext();
+        if (strings == null) {
+          current = null;
+          reader.close();
+          return false;
+        }
+        if (filterValues != null) {
+          for (int i = 0; i < strings.length; i++) {
+            String filterValue = filterValues[i];
+            if (filterValue != null) {
+              if (!filterValue.equals(strings[i])) {
+                continue outer;
+              }
+            }
+          }
+        }
+        current = rowConverter.convertRow(strings);
+        return true;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void reset() {
+    throw new UnsupportedOperationException();
+  }
+
+  public void close() {
+    try {
+      reader.close();
+    } catch (IOException e) {
+      throw new RuntimeException("Error closing CSV reader", e);
+    }
+  }
+
+  /** Returns an array of integers {0, ..., n - 1}. */
+  static int[] identityList(int n) {
+    int[] integers = new int[n];
+    for (int i = 0; i < n; i++) {
+      integers[i] = i;
+    }
+    return integers;
+  }
+
+  /** Row converter. */
+  abstract static class RowConverter<E> {
+    abstract E convertRow(String[] rows);
+
+    protected Object convert(CsvFieldType fieldType, String string) {
+      if (fieldType == null) {
+        return string;
+      }
+      switch (fieldType) {
+      case BOOLEAN:
+        if (string.length() == 0) {
+          return null;
+        }
+        return Boolean.parseBoolean(string);
+      case BYTE:
+        if (string.length() == 0) {
+          return null;
+        }
+        return Byte.parseByte(string);
+      case SHORT:
+        if (string.length() == 0) {
+          return null;
+        }
+        return Short.parseShort(string);
+      case INT:
+        if (string.length() == 0) {
+          return null;
+        }
+        return Integer.parseInt(string);
+      case LONG:
+        if (string.length() == 0) {
+          return null;
+        }
+        return Long.parseLong(string);
+      case FLOAT:
+        if (string.length() == 0) {
+          return null;
+        }
+        return Float.parseFloat(string);
+      case DOUBLE:
+        if (string.length() == 0) {
+          return null;
+        }
+        return Double.parseDouble(string);
+      case DATE:
+        if (string.length() == 0) {
+          return null;
+        }
+        try {
+          Date date = TIME_FORMAT_DATE.parse(string);
+          return new java.sql.Date(date.getTime());
+        } catch (ParseException e) {
+          return null;
+        }
+      case TIME:
+        if (string.length() == 0) {
+          return null;
+        }
+        try {
+          Date date = TIME_FORMAT_TIME.parse(string);
+          return new java.sql.Time(date.getTime());
+        } catch (ParseException e) {
+          return null;
+        }
+      case TIMESTAMP:
+        if (string.length() == 0) {
+          return null;
+        }
+        try {
+          Date date = TIME_FORMAT_TIMESTAMP.parse(string);
+          return new java.sql.Timestamp(date.getTime());
+        } catch (ParseException e) {
+          return null;
+        }
+      case STRING:
+      default:
+        return string;
+      }
+    }
+  }
+
+  /** Array row converter. */
+  static class ArrayRowConverter extends RowConverter<Object[]> {
+    private final CsvFieldType[] fieldTypes;
+    private final int[] fields;
+
+    ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields) {
+      this.fieldTypes = fieldTypes.toArray(new 
CsvFieldType[fieldTypes.size()]);
+      this.fields = fields;
+    }
+
+    public Object[] convertRow(String[] strings) {
+      final Object[] objects = new Object[fields.length];
+      for (int i = 0; i < fields.length; i++) {
+        int field = fields[i];
+        objects[i] = convert(fieldTypes[field], strings[field]);
+      }
+      return objects;
+    }
+  }
+
+  /** Single column row converter. */
+  private static class SingleColumnRowConverter extends RowConverter {
+    private final CsvFieldType fieldType;
+    private final int fieldIndex;
+
+    private SingleColumnRowConverter(CsvFieldType fieldType, int fieldIndex) {
+      this.fieldType = fieldType;
+      this.fieldIndex = fieldIndex;
+    }
+
+    public Object convertRow(String[] strings) {
+      return convert(fieldType, strings[fieldIndex]);
+    }
+  }
+}
+
+// End CsvEnumerator2.java

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java
new file mode 100644
index 0000000..f724f79
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.csv;
+
+import java.io.Reader;
+import java.util.Map;
+
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Schema mapped onto a directory of CSV files. Each table in the schema
+ * is a CSV file in that directory.
+ */
+public class CsvSchema2 extends AbstractSchema {
+  final private Map<String, Reader> inputs;
+  private final CsvTable.Flavor flavor;
+  private Map<String, Table> tableMap;
+
+  /**
+   * Creates a CSV schema.
+   *
+   * @param inputs     Inputs map
+   * @param flavor     Whether to instantiate flavor tables that undergo
+   *                   query optimization
+   */
+  public CsvSchema2(Map<String, Reader> inputs, CsvTable.Flavor flavor) {
+    super();
+    this.inputs = inputs;
+    this.flavor = flavor;
+  }
+
+  /** Looks for a suffix on a string and returns
+   * either the string with the suffix removed
+   * or the original string. */
+  private static String trim(String s, String suffix) {
+    String trimmed = trimOrNull(s, suffix);
+    return trimmed != null ? trimmed : s;
+  }
+
+  /** Looks for a suffix on a string and returns
+   * either the string with the suffix removed
+   * or null. */
+  private static String trimOrNull(String s, String suffix) {
+    return s.endsWith(suffix)
+        ? s.substring(0, s.length() - suffix.length())
+        : null;
+  }
+
+  @Override protected Map<String, Table> getTableMap() {
+
+    if (tableMap!=null)
+      return tableMap;
+
+    // Build a map from table name to table; each file becomes a table.
+    final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
+
+    for (Map.Entry<String, Reader> entry : inputs.entrySet()) {
+      final Table table = createTable(entry.getValue());
+      builder.put(entry.getKey(), table);
+    }
+
+    tableMap = builder.build();
+    return tableMap;
+  }
+
+  /** Creates different sub-type of table based on the "flavor" attribute. */
+  private Table createTable(Reader readerx) {
+    switch (flavor) {
+    case TRANSLATABLE:
+      return new CsvTranslatableTable2(readerx, null);
+//    case SCANNABLE:
+//      return new CsvScannableTable(file, null);
+//    case FILTERABLE:
+//      return new CsvFilterableTable(file, null);
+    default:
+      throw new AssertionError("Unknown flavor " + flavor);
+    }
+  }
+}
+
+// End CsvSchema2.java

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java
new file mode 100644
index 0000000..f8ec576
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.csv;
+
+import java.io.Reader;
+import java.util.Map;
+
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+
+/**
+ * Factory that creates a {@link CsvSchema}.
+ *
+ * <p>Allows a custom schema to be included in a <code><i>model</i>.json</code>
+ * file.</p>
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class CsvSchemaFactory2 implements SchemaFactory {
+  final private Map<String, Reader> inputs;
+  // public constructor, per factory contract
+  public CsvSchemaFactory2(Map<String, Reader> inputs) {
+      this.inputs = inputs;
+  }
+
+  public Schema create(SchemaPlus parentSchema, String name, Map<String, 
Object> operand) {
+    String flavorName = (String) operand.get("flavor");
+    CsvTable.Flavor flavor;
+    if (flavorName == null) {
+      flavor = CsvTable.Flavor.SCANNABLE;
+    } else {
+      flavor = CsvTable.Flavor.valueOf(flavorName.toUpperCase());
+    }
+
+    return new CsvSchema2(inputs, flavor);
+  }
+}
+
+// End CsvSchemaFactory2.java

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java
new file mode 100644
index 0000000..75f013c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.csv;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.Blocks;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.Primitive;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+
+import java.util.List;
+
+/**
+ * Relational expression representing a scan of a CSV stream.
+ *
+ * <p>Like any table scan, it serves as a leaf node of a query tree.</p>
+ */
+public class CsvTableScan2 extends TableScan implements EnumerableRel {
+  final CsvTranslatableTable2 csvTable;
+  final int[] fields;
+
+  protected CsvTableScan2(RelOptCluster cluster, RelOptTable table,
+      CsvTranslatableTable2 csvTable, int[] fields) {
+    super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table);
+    this.csvTable = csvTable;
+    this.fields = fields;
+
+    assert csvTable != null;
+  }
+
+  @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    assert inputs.isEmpty();
+    return new CsvTableScan2(getCluster(), table, csvTable, fields);
+  }
+
+  @Override public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw)
+        .item("fields", Primitive.asList(fields));
+  }
+
+  @Override public RelDataType deriveRowType() {
+    final List<RelDataTypeField> fieldList = table.getRowType().getFieldList();
+    final RelDataTypeFactory.FieldInfoBuilder builder =
+        getCluster().getTypeFactory().builder();
+    for (int field : fields) {
+      builder.add(fieldList.get(field));
+    }
+    return builder.build();
+  }
+
+  @Override public void register(RelOptPlanner planner) {
+    planner.addRule(CsvProjectTableScanRule.INSTANCE);
+  }
+
+  public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
+    PhysType physType =
+        PhysTypeImpl.of(
+            implementor.getTypeFactory(),
+            getRowType(),
+            pref.preferArray());
+
+    if (table instanceof JsonTable) {
+      return implementor.result(
+          physType,
+          Blocks.toBlock(
+              Expressions.call(table.getExpression(JsonTable.class),
+                  "enumerable")));
+    }
+    return implementor.result(
+        physType,
+        Blocks.toBlock(
+            Expressions.call(table.getExpression(CsvTranslatableTable2.class),
+                "project", Expressions.constant(fields))));
+  }
+}
+
+// End CsvTableScan.java

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java
new file mode 100644
index 0000000..bc28fdd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.csv;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.QueryableTable;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.TranslatableTable;
+
+import au.com.bytecode.opencsv.CSVReader;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+
+/**
+ * Table based on a CSV stream.
+ */
+public class CsvTranslatableTable2 extends CsvTable
+    implements QueryableTable, TranslatableTable {
+
+  final private CSVReader csvReader;
+  private CsvEnumerator2<Object> csvEnumerator2;
+  final private String[] firstLine;
+
+  /** Creates a CsvTable.
+   */
+  CsvTranslatableTable2(Reader readerx, RelProtoDataType protoRowType) {
+    super(null, protoRowType);
+    this.csvReader = new CSVReader(readerx);
+    try {
+        this.firstLine = csvReader.readNext();
+    } catch (IOException e) {
+        throw new RuntimeException("csvReader.readNext() failed ", e);
+    }
+  }
+
+  public String toString() {
+    return "CsvTranslatableTable2";
+  }
+
+  /** Returns an enumerable over a given projection of the fields.
+   *
+   * <p>Called from generated code. */
+  public Enumerable<Object> project(final int[] fields) {
+    return new AbstractEnumerable<Object>() {
+      public Enumerator<Object> enumerator() {
+        return csvEnumerator2;
+      }
+    };
+  }
+
+  public Expression getExpression(SchemaPlus schema, String tableName,
+      Class clazz) {
+    return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
+  }
+
+  public Type getElementType() {
+    return Object[].class;
+  }
+
+  public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
+      SchemaPlus schema, String tableName) {
+    throw new UnsupportedOperationException();
+  }
+
+  public RelNode toRel(
+      RelOptTable.ToRelContext context,
+      RelOptTable relOptTable) {
+    // Request all fields.
+    final int fieldCount = relOptTable.getRowType().getFieldCount();
+    final int[] fields = CsvEnumerator.identityList(fieldCount);
+    return new CsvTableScan2(context.getCluster(), relOptTable, this, fields);
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+      RelDataType rowType = null;
+
+      if (fieldTypes == null) {
+          fieldTypes = new ArrayList<CsvFieldType>();
+          rowType =  CsvEnumerator2.deduceRowType((JavaTypeFactory) 
typeFactory, firstLine, fieldTypes);
+      } else {
+          rowType = CsvEnumerator2.deduceRowType((JavaTypeFactory) 
typeFactory, firstLine, null);
+      }
+
+      if (csvEnumerator2==null)
+          csvEnumerator2 = new CsvEnumerator2<Object>(csvReader, fieldTypes);
+
+          return rowType;
+      }
+}
+
+// End CsvTranslatableTable2.java

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java
new file mode 100644
index 0000000..718f462
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static java.sql.Types.CHAR;
+import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NCHAR;
+import static java.sql.Types.NVARCHAR;
+import static java.sql.Types.VARCHAR;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.calcite.adapter.csv.CsvSchemaFactory2;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.util.StopWatch;
+
+import com.google.common.collect.ImmutableMap;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"xml", "xslt", "transform"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Filter out specific columns from CSV data. Some other 
transformations are also supported."
+        + "Columns can be renamed, simple calculations performed, 
aggregations, etc."
+        + "SQL select statement is used to specify how CSV data should be 
transformed."
+        + "SQL statement follows standard SQL, some restrictions may apply."
+        + "Successfully transformed CSV data is routed to the 'success' 
relationship."
+        + "If transform fails, the original FlowFile is routed to the 
'failure' relationship")
+public class FilterCSVColumns  extends AbstractProcessor {
+
+    public static final PropertyDescriptor SQL_SELECT = new 
PropertyDescriptor.Builder()
+            .name("SQL select statement")
+            .description("SQL select statement specifies how CSV data should 
be transformed. "
+                       + "Sql select should select from CSV.A table")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The FlowFile with transformed content will be routed 
to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If a FlowFile fails processing for any reason (for 
example, the SQL statement contains columns not present in CSV), it will be 
routed to this relationship")
+            .build();
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(SQL_SELECT);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final ProcessorLog logger = getLogger();
+        final StopWatch stopWatch = new StopWatch(true);
+
+        try {
+            FlowFile transformed = session.write(original, new 
StreamCallback() {
+                @Override
+                public void process(final InputStream rawIn, final 
OutputStream out) throws IOException {
+                    try (final InputStream in = new 
BufferedInputStream(rawIn)) {
+
+                        String sql = 
context.getProperty(SQL_SELECT).getValue();
+                        final ResultSet resultSet = transform(rawIn, sql);
+                        convertToCSV(resultSet, out);
+
+                    } catch (final Exception e) {
+                        throw new IOException(e);
+                    }
+                }
+            });
+            session.transfer(transformed, REL_SUCCESS);
+            session.getProvenanceReporter().modifyContent(transformed, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            logger.info("Transformed {}", new Object[]{original});
+        } catch (ProcessException e) {
+            logger.error("Unable to transform {} due to {}", new 
Object[]{original, e});
+            session.transfer(original, REL_FAILURE);
+        }
+    }
+
+    static protected ResultSet transform(InputStream rawIn, String sql) throws 
SQLException {
+
+        Reader readerx = new InputStreamReader(rawIn);
+        HashMap<String, Reader> inputs = new HashMap<>();
+        inputs.put("A", readerx);
+
+        Statement statement = null;
+        final Properties properties = new Properties();
+//      properties.setProperty("caseSensitive", "true");
+        try (final Connection connection = 
DriverManager.getConnection("jdbc:calcite:", properties)) {
+            final CalciteConnection calciteConnection = 
connection.unwrap(CalciteConnection.class);
+
+            final SchemaPlus rootSchema = calciteConnection.getRootSchema();
+            final Schema schema =
+              new CsvSchemaFactory2(inputs)
+                  .create(rootSchema, "CSV", ImmutableMap.<String, 
Object>of("flavor", "TRANSLATABLE"));
+
+            calciteConnection.getRootSchema().add("CSV", schema);
+            rootSchema.add("default", schema);
+
+            statement = connection.createStatement();
+            final ResultSet resultSet = statement.executeQuery(sql);
+            return resultSet;
+        }
+    }
+
+    static protected void convertToCSV(ResultSet resultSet, OutputStream out) 
throws SQLException, IOException {
+
+        convertToCsvStream(resultSet, out);
+    }
+
+    public static long convertToCsvStream(final ResultSet rs, final 
OutputStream outStream) throws SQLException, IOException {
+        return convertToCsvStream(rs, outStream, null, null);
+    }
+
+    public static long convertToCsvStream(final ResultSet rs, final 
OutputStream outStream, String recordName, ResultSetRowCallback callback)
+            throws SQLException, IOException {
+
+        final ResultSetMetaData meta = rs.getMetaData();
+        final int nrOfColumns = meta.getColumnCount();
+        List<String> columnNames = new ArrayList<>(nrOfColumns);
+
+        for (int i = 1; i <= nrOfColumns; i++) {
+            String columnNameFromMeta = meta.getColumnName(i);
+            // Hive returns table.column for column name. Grab the column name 
as the string after the last period
+            int columnNameDelimiter = columnNameFromMeta.lastIndexOf(".");
+            columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 
1));
+        }
+
+        // Write column names as header row
+        outStream.write(StringUtils.join(columnNames, 
",").getBytes(StandardCharsets.UTF_8));
+        outStream.write("\n".getBytes(StandardCharsets.UTF_8));
+
+        // Iterate over the rows
+        long nrOfRows = 0;
+        while (rs.next()) {
+            if (callback != null) {
+                callback.processRow(rs);
+            }
+            List<String> rowValues = new ArrayList<>(nrOfColumns);
+            for (int i = 1; i <= nrOfColumns; i++) {
+                final int javaSqlType = meta.getColumnType(i);
+                final Object value = rs.getObject(i);
+
+                switch (javaSqlType) {
+                    case CHAR:
+                    case LONGNVARCHAR:
+                    case LONGVARCHAR:
+                    case NCHAR:
+                    case NVARCHAR:
+                    case VARCHAR:
+                        rowValues.add("\"" + 
StringEscapeUtils.escapeCsv(rs.getString(i)) + "\"");
+                        break;
+                    default:
+                        rowValues.add(value.toString());
+                }
+            }
+            // Write row values
+            outStream.write(StringUtils.join(rowValues, 
",").getBytes(StandardCharsets.UTF_8));
+            outStream.write("\n".getBytes(StandardCharsets.UTF_8));
+            nrOfRows++;
+        }
+        return nrOfRows;
+    }
+
+    /**
+     * An interface for callback methods which allows processing of a row 
during the convertToXYZStream() processing.
+     * <b>IMPORTANT:</b> This method should only work on the row pointed at by 
the current ResultSet reference.
+     * Advancing the cursor (e.g.) can cause rows to be skipped during Avro 
transformation.
+     */
+    public interface ResultSetRowCallback {
+        void processRow(ResultSet resultSet) throws IOException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java
new file mode 100644
index 0000000..421da98
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFilterCSVColumns {
+
+    private static final Logger LOGGER;
+
+    static {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.FilterCSVColumns",
 "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestFilterCSVColumns",
 "debug");
+        LOGGER = LoggerFactory.getLogger(TestFilterCSVColumns.class);
+    }
+
+    @Test
+    public void testTransformSimple() throws InitializationException, 
IOException, SQLException {
+        String sql = "select first_name, last_name, company_name, address, 
city from CSV.A where city='New York'";
+
+        Path inpath = 
Paths.get("src/test/resources/TestFilterCSVColumns/US500.csv");
+        InputStream in = new FileInputStream(inpath.toFile());
+
+        ResultSet resultSet = FilterCSVColumns.transform(in, sql);
+
+        int nrofColumns = resultSet.getMetaData().getColumnCount();
+
+        for (int i = 1; i <= nrofColumns; i++) {
+            System.out.print(resultSet.getMetaData().getColumnLabel(i) + "     
 ");
+        }
+        System.out.println();
+
+        while (resultSet.next()) {
+            for (int i = 1; i <= nrofColumns; i++) {
+                System.out.print(resultSet.getString(i)+ "  ");
+            }
+            System.out.println();
+        }
+    }
+
+    @Test
+    public void testTransformCalc() throws InitializationException, 
IOException, SQLException {
+        String sql = "select ID, AMOUNT1+AMOUNT2+AMOUNT3 as TOTAL from CSV.A 
where ID=100";
+
+        Path inpath = 
Paths.get("src/test/resources/TestFilterCSVColumns/Numeric.csv");
+        InputStream in = new FileInputStream(inpath.toFile());
+
+        ResultSet resultSet = FilterCSVColumns.transform(in, sql);
+
+        int nrofColumns = resultSet.getMetaData().getColumnCount();
+
+        for (int i = 1; i <= nrofColumns; i++) {
+            System.out.print(resultSet.getMetaData().getColumnLabel(i) + "     
 ");
+        }
+        System.out.println();
+
+        while (resultSet.next()) {
+            for (int i = 1; i <= nrofColumns; i++) {
+                System.out.print(resultSet.getString(i)+ "  ");
+            }
+            double total = resultSet.getDouble("TOTAL");
+            System.out.println();
+            assertEquals(90.75, total, 0.0001);
+        }
+    }
+
+    @Test
+    public void testSimpleTypeless() throws InitializationException, 
IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(FilterCSVColumns.class);
+        String sql = "select first_name, last_name, company_name, address, 
city from CSV.A where city='New York'";
+        runner.setProperty(FilterCSVColumns.SQL_SELECT, sql);
+
+        
runner.enqueue(Paths.get("src/test/resources/TestFilterCSVColumns/US500_typeless.csv"));
+        runner.run();
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS);
+        for (final MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile);
+            System.out.println(new String(flowFile.toByteArray()));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d5872a3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv
new file mode 100644
index 0000000..2d56bb7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv
@@ -0,0 +1,5 @@
+ID:int,AMOUNT1: float,AMOUNT2:float,AMOUNT3:float
+008, 10.05, 15.45, 89.99
+100, 20.25, 25.25, 45.25
+105, 20.05, 25.05, 45.05
+200, 34.05, 25.05, 75.05
\ No newline at end of file

Reply via email to