Repository: calcite
Updated Branches:
  refs/heads/master e30600d63 -> d9eb43832


[CALCITE-1227] Add streaming CSV table (Zhen Wang)

Close apache/calcite#239


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/65c1cec2
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/65c1cec2
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/65c1cec2

Branch: refs/heads/master
Commit: 65c1cec22d93a83b73bbdb8c73d08d8642db46e1
Parents: e30600d
Author: zhen wang <[email protected]>
Authored: Sun May 29 11:54:33 2016 +0800
Committer: Julian Hyde <[email protected]>
Committed: Wed Jul 6 11:44:51 2016 -0700

----------------------------------------------------------------------
 example/csv/pom.xml                             |   5 +
 .../calcite/adapter/csv/CsvEnumerator.java      |  43 ++++-
 .../adapter/csv/CsvStreamEnumerator.java        |  96 +++++++++++
 .../calcite/adapter/csv/CsvStreamReader.java    | 163 +++++++++++++++++++
 .../adapter/csv/CsvStreamScannableTable.java    |  78 +++++++++
 .../adapter/csv/CsvStreamTableFactory.java      |  56 +++++++
 .../apache/calcite/adapter/csv/CsvTable.java    |   2 +-
 .../src/test/resources/model-stream-table.json  |  39 +++++
 .../src/test/resources/order-stream-table.json  |  44 +++++
 example/csv/src/test/resources/sales/SDEPTS.csv |   7 +
 10 files changed, 529 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/65c1cec2/example/csv/pom.xml
----------------------------------------------------------------------
diff --git a/example/csv/pom.xml b/example/csv/pom.xml
index a6dd736..3d9dd68 100644
--- a/example/csv/pom.xml
+++ b/example/csv/pom.xml
@@ -66,6 +66,11 @@ limitations under the License.
       <artifactId>commons-lang3</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>2.4</version>
+    </dependency>
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-core</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/calcite/blob/65c1cec2/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java
----------------------------------------------------------------------
diff --git 
a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java 
b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java
index 8db969c..5b942fa 100644
--- 
a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java
+++ 
b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java
@@ -19,6 +19,7 @@ package org.apache.calcite.adapter.csv;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Pair;
 
 import org.apache.commons.lang3.time.FastDateFormat;
@@ -92,13 +93,22 @@ class CsvEnumerator<E> implements Enumerator<E> {
     }
   }
 
+  static RelDataType deduceRowType(JavaTypeFactory typeFactory, File file,
+                                   List<CsvFieldType> fieldTypes) {
+    return deduceRowType(typeFactory, file, fieldTypes, false);
+  }
+
   /** Deduces the names and types of a table's columns by reading the first 
line
-   * of a CSV file. */
+  * of a CSV file. */
   static RelDataType deduceRowType(JavaTypeFactory typeFactory, File file,
-      List<CsvFieldType> fieldTypes) {
+      List<CsvFieldType> fieldTypes, Boolean stream) {
     final List<RelDataType> types = new ArrayList<>();
     final List<String> names = new ArrayList<>();
     CSVReader reader = null;
+    if (stream) {
+      names.add("ROWTIME");
+      types.add(typeFactory.createSqlType(SqlTypeName.TIMESTAMP));
+    }
     try {
       reader = openCsv(file);
       final String[] strings = reader.readNext();
@@ -150,7 +160,7 @@ class CsvEnumerator<E> implements Enumerator<E> {
     return typeFactory.createStructType(Pair.zip(names, types));
   }
 
-  private static CSVReader openCsv(File file) throws IOException {
+  public static CSVReader openCsv(File file) throws IOException {
     final Reader fileReader;
     if (file.getName().endsWith(".gz")) {
       final GZIPInputStream inputStream =
@@ -300,13 +310,30 @@ class CsvEnumerator<E> implements Enumerator<E> {
   static class ArrayRowConverter extends RowConverter<Object[]> {
     private final CsvFieldType[] fieldTypes;
     private final int[] fields;
+    //whether the row to convert is from a stream
+    private final boolean stream;
 
     ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields) {
       this.fieldTypes = fieldTypes.toArray(new 
CsvFieldType[fieldTypes.size()]);
       this.fields = fields;
+      this.stream = false;
+    }
+
+    ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields, boolean 
stream) {
+      this.fieldTypes = fieldTypes.toArray(new 
CsvFieldType[fieldTypes.size()]);
+      this.fields = fields;
+      this.stream = stream;
     }
 
     public Object[] convertRow(String[] strings) {
+      if (stream) {
+        return convertStreamRow(strings);
+      } else {
+        return convertNormalRow(strings);
+      }
+    }
+
+    public Object[] convertNormalRow(String[] strings) {
       final Object[] objects = new Object[fields.length];
       for (int i = 0; i < fields.length; i++) {
         int field = fields[i];
@@ -314,6 +341,16 @@ class CsvEnumerator<E> implements Enumerator<E> {
       }
       return objects;
     }
+
+    public Object[] convertStreamRow(String[] strings) {
+      final Object[] objects = new Object[fields.length + 1];
+      objects[0] = System.currentTimeMillis();
+      for (int i = 0; i < fields.length; i++) {
+        int field = fields[i];
+        objects[i + 1] = convert(fieldTypes[field], strings[field]);
+      }
+      return objects;
+    }
   }
 
   /** Single column row converter. */

http://git-wip-us.apache.org/repos/asf/calcite/blob/65c1cec2/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java
----------------------------------------------------------------------
diff --git 
a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java
 
b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java
new file mode 100644
index 0000000..4609f11
--- /dev/null
+++ 
b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamEnumerator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.csv;
+
+import org.apache.calcite.linq4j.Enumerator;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Csv Streaming enumerator
+ * @param <E> Row type
+ */
+public class CsvStreamEnumerator<E> implements Enumerator<E> {
+  protected CsvStreamReader streamReader;
+  protected String[] filterValues;
+  protected CsvEnumerator.RowConverter<E> rowConverter;
+  protected E current;
+
+  public CsvStreamEnumerator(File file, String[] filterValues,
+    CsvEnumerator.RowConverter<E> rowConverter) {
+    this.rowConverter = rowConverter;
+    this.filterValues = filterValues;
+    try {
+      this.streamReader = new CsvStreamReader(file);
+      this.streamReader.readNext(); // skip header row
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public boolean moveNext() {
+    return true;
+  }
+
+  public E readNext() {
+    try {
+    outer:
+      for (;;) {
+        final String[] strings = streamReader.readNext();
+        if (strings == null) {
+          current = null;
+          streamReader.close();
+          return current;
+        } else {
+          if (filterValues != null) {
+            for (int i = 0; i < strings.length; i++) {
+              String filterValue = filterValues[i];
+              if (filterValue != null) {
+                if (!filterValue.equals(strings[i])) {
+                  continue outer;
+                }
+              }
+            }
+          }
+          current = rowConverter.convertRow(strings);
+          return current;
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override public E current() {
+    return readNext();
+  }
+
+  @Override public void close() {
+    try {
+      streamReader.close();
+    } catch (IOException e) {
+      throw new RuntimeException("Error closing Csv Stream reader", e);
+    }
+  }
+
+  @Override public void reset() {
+    throw new UnsupportedOperationException();
+  }
+}
+
+// End CsvStreamEnumerator.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/65c1cec2/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java
----------------------------------------------------------------------
diff --git 
a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java 
b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java
new file mode 100644
index 0000000..5f2cf19
--- /dev/null
+++ 
b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamReader.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.csv;
+
+import org.apache.commons.io.input.Tailer;
+import org.apache.commons.io.input.TailerListener;
+import org.apache.commons.io.input.TailerListenerAdapter;
+
+import au.com.bytecode.opencsv.CSVParser;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/**
+ * CSVSreamReader that can read newly appended file content
+ */
+public class CsvStreamReader implements Closeable {
+  protected CSVParser parser;
+  protected int skipLines;
+  protected Tailer tailer;
+  protected Queue<String> contentQueue;
+
+  /**
+   * The default line to start reading.
+   */
+  public static final int DEFAULT_SKIP_LINES = 0;
+
+  /**
+   * The default file monitor delay.
+   */
+  public static final long DEFAULT_MONITOR_DELAY = 2000;
+
+  public CsvStreamReader(File csvFile) {
+    this(
+      csvFile,
+      CSVParser.DEFAULT_SEPARATOR,
+      CSVParser.DEFAULT_QUOTE_CHARACTER,
+      CSVParser.DEFAULT_ESCAPE_CHARACTER,
+      DEFAULT_SKIP_LINES,
+      CSVParser.DEFAULT_STRICT_QUOTES,
+      CSVParser.DEFAULT_IGNORE_LEADING_WHITESPACE
+    );
+  }
+
+  /**
+   * Constructs CSVReader with supplied separator and quote char.
+   *
+   * @param csvFile the file to an underlying CSV source.
+   * @param separator the delimiter to use for separating entries
+   * @param quotechar the character to use for quoted elements
+   * @param escape the character to use for escaping a separator or quote
+   * @param line the line number to skip for start reading
+   * @param strictQuotes sets if characters outside the quotes are ignored
+   * @param ignoreLeadingWhiteSpace it true, parser should ignore
+   *  white space before a quote in a field
+   */
+  public CsvStreamReader(File csvFile, char separator, char quotechar, char 
escape, int line,
+                         boolean strictQuotes, boolean 
ignoreLeadingWhiteSpace) {
+    contentQueue = new ArrayDeque<String>();
+    TailerListener listener = new CSVContentListener(contentQueue);
+    tailer = Tailer.create(csvFile, listener, DEFAULT_MONITOR_DELAY, false, 
true, 4096);
+    this.parser = new CSVParser(
+      separator,
+      quotechar,
+      escape,
+      strictQuotes,
+      ignoreLeadingWhiteSpace
+    );
+    this.skipLines = line;
+    try {
+      //wait for tailer to capture data
+      Thread.sleep(DEFAULT_MONITOR_DELAY);
+    } catch (InterruptedException e) {
+      //ignore the interruption
+    }
+  }
+
+  /**
+   * Reads the next line from the buffer and converts to a string array.
+   *
+   * @return a string array with each comma-separated element as a separate 
entry.
+   *
+   * @throws IOException if bad things happen during the read
+   */
+  public String[] readNext() throws IOException {
+
+    String[] result = null;
+    do {
+      String nextLine = getNextLine();
+      while (nextLine == null) {
+        try {
+          Thread.sleep(DEFAULT_MONITOR_DELAY);
+          nextLine = getNextLine();
+        } catch (InterruptedException e) {
+          return null; // should throw if still pending?
+        }
+      }
+      String[] r = parser.parseLineMulti(nextLine);
+      if (r.length > 0) {
+        if (result == null) {
+          result = r;
+        } else {
+          String[] t = new String[result.length + r.length];
+          System.arraycopy(result, 0, t, 0, result.length);
+          System.arraycopy(r, 0, t, result.length, r.length);
+          result = t;
+        }
+      }
+    } while (parser.isPending());
+    return result;
+  }
+
+  /**
+   * Reads the next line from the file.
+   *
+   * @return the next line from the file without trailing newline
+   * @throws IOException
+   *             if bad things happen during the read
+   */
+  private String getNextLine() throws IOException {
+    return contentQueue.poll();
+  }
+
+  /**
+   * Closes the underlying reader.
+   *
+   * @throws IOException if the close fails
+   */
+  public void close() throws IOException {
+  }
+
+  /** csv file content watcher*/
+  class CSVContentListener extends TailerListenerAdapter {
+    Queue<String> contentQueue;
+
+    CSVContentListener(Queue<String> contentQueue) {
+      this.contentQueue = contentQueue;
+    }
+
+    @Override public void handle(String line) {
+      this.contentQueue.add(line);
+    }
+  }
+}
+
+// End CsvStreamReader.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/65c1cec2/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java
----------------------------------------------------------------------
diff --git 
a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java
 
b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java
new file mode 100644
index 0000000..752f8a7
--- /dev/null
+++ 
b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamScannableTable.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.csv;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+
+import java.io.File;
+import java.util.ArrayList;
+
+/**
+ * Table based on a CSV file.
+ *
+ * <p>It implements the {@link ScannableTable} interface, so Calcite gets
+ * data by calling the {@link #scan(DataContext)} method.
+ */
+public class CsvStreamScannableTable extends CsvScannableTable
+    implements StreamableTable {
+  /** Creates a CsvScannableTable. */
+  CsvStreamScannableTable(File file, RelProtoDataType protoRowType) {
+    super(file, protoRowType);
+  }
+
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    if (protoRowType != null) {
+      return protoRowType.apply(typeFactory);
+    }
+    if (fieldTypes == null) {
+      fieldTypes = new ArrayList<CsvFieldType>();
+      return CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, file, 
fieldTypes, true);
+    } else {
+      return CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, file, 
null, true);
+    }
+  }
+
+  public String toString() {
+    return "CsvStreamScannableTable";
+  }
+
+  public Enumerable<Object[]> scan(DataContext root) {
+    final int[] fields = CsvEnumerator.identityList(fieldTypes.size());
+    return new AbstractEnumerable<Object[]>() {
+      public Enumerator<Object[]> enumerator() {
+        return new CsvStreamEnumerator<Object[]>(file,
+            null, new CsvEnumerator.ArrayRowConverter(fieldTypes, fields, 
true));
+      }
+    };
+  }
+
+  @Override public Table stream() {
+    return this;
+  }
+}
+
+// End CsvStreamScannableTable.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/65c1cec2/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamTableFactory.java
----------------------------------------------------------------------
diff --git 
a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamTableFactory.java
 
b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamTableFactory.java
new file mode 100644
index 0000000..72d26a9
--- /dev/null
+++ 
b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvStreamTableFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.csv;
+
+import org.apache.calcite.model.ModelHandler;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TableFactory;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * Factory that creates a {@link CsvTranslatableTable}.
+ *
+ * <p>Allows a CSV table to be included in a model.json file, even in a
+ * schema that is not based upon {@link CsvSchema}.</p>
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class CsvStreamTableFactory implements TableFactory<CsvTable> {
+  // public constructor, per factory contract
+  public CsvStreamTableFactory() {
+  }
+
+  public CsvTable create(SchemaPlus schema, String name,
+      Map<String, Object> operand, RelDataType rowType) {
+    String fileName = (String) operand.get("file");
+    File file = new File(fileName);
+    final File base =
+        (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName);
+    if (base != null && !file.isAbsolute()) {
+      file = new File(base, fileName);
+    }
+    final RelProtoDataType protoRowType =
+        rowType != null ? RelDataTypeImpl.proto(rowType) : null;
+    return new CsvStreamScannableTable(file, protoRowType);
+  }
+}
+
+// End CsvStreamTableFactory.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/65c1cec2/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTable.java
----------------------------------------------------------------------
diff --git 
a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTable.java 
b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTable.java
index 05faf18..e09863b 100644
--- a/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTable.java
+++ b/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvTable.java
@@ -31,7 +31,7 @@ import java.util.List;
  */
 public abstract class CsvTable extends AbstractTable {
   protected final File file;
-  private final RelProtoDataType protoRowType;
+  protected final RelProtoDataType protoRowType;
   protected List<CsvFieldType> fieldTypes;
 
   /** Creates a CsvAbstractTable. */

http://git-wip-us.apache.org/repos/asf/calcite/blob/65c1cec2/example/csv/src/test/resources/model-stream-table.json
----------------------------------------------------------------------
diff --git a/example/csv/src/test/resources/model-stream-table.json 
b/example/csv/src/test/resources/model-stream-table.json
new file mode 100644
index 0000000..62d6cba
--- /dev/null
+++ b/example/csv/src/test/resources/model-stream-table.json
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  version: '1.0',
+  defaultSchema: 'STREAM',
+  schemas: [
+    {
+      name: 'SS',
+      tables: [
+        {
+          name: 'DEPTS',
+          type: 'custom',
+          factory: 'org.apache.calcite.adapter.csv.CsvStreamTableFactory',
+          stream: {
+            stream: true
+          },
+          operand: {
+            file: 'sales/SDEPTS.csv',
+            flavor: "scannable"
+          }
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/calcite/blob/65c1cec2/example/csv/src/test/resources/order-stream-table.json
----------------------------------------------------------------------
diff --git a/example/csv/src/test/resources/order-stream-table.json 
b/example/csv/src/test/resources/order-stream-table.json
new file mode 100644
index 0000000..8308846
--- /dev/null
+++ b/example/csv/src/test/resources/order-stream-table.json
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  version: '1.0',
+  defaultSchema: 'foodmart',
+  schemas: [
+    {
+      name: 'STREAMS',
+      tables: [ {
+        type: 'custom',
+        name: 'ORDERS',
+        stream: {
+          stream: true
+        },
+        factory: 'org.apache.calcite.test.StreamTest$OrdersStreamTableFactory'
+      } ]
+    },
+    {
+      name: 'INFINITE_STREAMS',
+      tables: [ {
+        type: 'custom',
+        name: 'ORDERS',
+        stream: {
+          stream: true
+        },
+        factory: 
'org.apache.calcite.test.StreamTest$InfiniteOrdersStreamTableFactory'
+      } ]
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/calcite/blob/65c1cec2/example/csv/src/test/resources/sales/SDEPTS.csv
----------------------------------------------------------------------
diff --git a/example/csv/src/test/resources/sales/SDEPTS.csv 
b/example/csv/src/test/resources/sales/SDEPTS.csv
new file mode 100644
index 0000000..b555c42
--- /dev/null
+++ b/example/csv/src/test/resources/sales/SDEPTS.csv
@@ -0,0 +1,7 @@
+DEPTNO:int,NAME:string
+10,"Sales"
+20,"Marketing"
+30,"Accounts"
+40,"40"
+50,"50"
+60,"60"

Reply via email to