[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource. This closes #5043.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35517f12 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35517f12 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35517f12 Branch: refs/heads/release-1.4 Commit: 35517f1291293f73f4466a4fdbed4296b2dd80a5 Parents: 7868ea4 Author: Fabian Hueske <[email protected]> Authored: Mon Nov 13 14:54:54 2017 +0100 Committer: Fabian Hueske <[email protected]> Committed: Thu Nov 23 00:35:58 2017 +0100 ---------------------------------------------------------------------- docs/dev/table/sourceSinks.md | 49 + flink-connectors/flink-orc/pom.xml | 87 +- .../org/apache/flink/orc/OrcRowInputFormat.java | 745 ++ .../org/apache/flink/orc/OrcTableSource.java | 455 +- .../java/org/apache/flink/orc/OrcUtils.java | 2379 ++-- .../org/apache/flink/orc/RowOrcInputFormat.java | 241 - .../apache/flink/orc/OrcRowInputFormatTest.java | 795 ++ .../apache/flink/orc/OrcTableSourceITCase.java | 134 +- .../apache/flink/orc/OrcTableSourceTest.java | 266 +- .../java/org/apache/flink/orc/OrcUtilsTest.java | 148 + .../apache/flink/orc/RowOrcInputFormatTest.java | 472 - .../test/resources/TestOrcFile.emptyFile.orc | Bin 523 -> 0 bytes .../TestOrcFile.listliststructlong.orc | Bin 845 -> 0 bytes .../src/test/resources/TestOrcFile.listlong.orc | Bin 627 -> 0 bytes .../test/resources/TestOrcFile.liststring.orc | Bin 1298 -> 0 bytes .../src/test/resources/TestOrcFile.test1.orc | Bin 1711 -> 0 bytes .../test/resources/TestOrcFile.testDate1900.dat | 10000 ----------------- .../test/resources/TestOrcFile.testDate1900.orc | Bin 30941 -> 0 bytes .../flink-orc/src/test/resources/decimal.dat | 6000 ---------- .../flink-orc/src/test/resources/decimal.orc | Bin 16337 -> 0 bytes .../src/test/resources/demo-11-none.orc | Bin 5147970 -> 0 bytes .../src/test/resources/test-data-decimal.orc | Bin 0 -> 16337 bytes .../src/test/resources/test-data-flat.orc | Bin 0 -> 408522 bytes .../src/test/resources/test-data-nested.orc | Bin 0 -> 1711 bytes .../src/test/resources/test-data-nestedlist.orc | Bin 0 -> 845 bytes .../src/test/resources/test-data-timetypes.orc | Bin 0 -> 30941 bytes .../flink/api/java/typeutils/RowTypeInfo.java | 17 + .../logical/FlinkLogicalTableSourceScan.scala | 16 +- .../table/plan/util/RexProgramExtractor.scala | 12 + 29 files changed, 3305 insertions(+), 18511 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/docs/dev/table/sourceSinks.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md index 0b4bdbe..7387358 100644 --- a/docs/dev/table/sourceSinks.md +++ b/docs/dev/table/sourceSinks.md @@ -47,6 +47,7 @@ A custom `TableSource` can be defined by implementing the `BatchTableSource` or | `Kafka08AvroTableSource` | `flink-connector-kafka-0.8` | N | Y | A `TableSource` for Avro-encoded Kafka 0.8 topics. | `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A `TableSource` for flat Json-encoded Kafka 0.8 topics. | `CsvTableSource` | `flink-table` | Y | Y | A simple `TableSource` for CSV files. +| `OrcTableSource` | `flink-orc` | Y | N | A `TableSource` for ORC files. All sources that come with the `flink-table` dependency are directly available for Table API or SQL programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency. @@ -485,6 +486,54 @@ val csvTableSource = CsvTableSource {% top %} +### OrcTableSource + +The `OrcTableSource` reads [ORC files](https://orc.apache.org). ORC is a file format for structured data and stores the data in a compressed, columnar representation. ORC is very storage efficient and supports projection and filter push-down. + +An `OrcTableSource` is created as shown below: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + +// create Hadoop Configuration +Configuration config = new Configuration(); + +OrcTableSource orcTableSource = OrcTableSource.builder() + // path to ORC file(s) + .path("file:///path/to/data") + // schema of ORC files + .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>") + // Hadoop configuration + .withConfiguration(config) + // build OrcTableSource + .build(); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} + +// create Hadoop Configuration +val config = new Configuration() + +val orcTableSource = OrcTableSource.builder() + // path to ORC file(s) + .path("file:///path/to/data") + // schema of ORC files + .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>") + // Hadoop configuration + .withConfiguration(config) + // build OrcTableSource + .build() +{% endhighlight %} +</div> +</div> + +**Note:** The `OrcTableSource` does not support ORC's `Union` type yet. + +{% top %} + Provided TableSinks ------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/pom.xml b/flink-connectors/flink-orc/pom.xml index 1ac7eaa..4866cd7 100644 --- a/flink-connectors/flink-orc/pom.xml +++ b/flink-connectors/flink-orc/pom.xml @@ -40,22 +40,39 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table_${scala.binary.version}</artifactId> + <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${project.version}</version> - <scope>compile</scope> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> + <artifactId>flink-table_${scala.binary.version}</artifactId> <version>${project.version}</version> - <scope>compile</scope> + <scope>provided</scope> + <!-- Projects depending on this project, won't depend on flink-table. --> + <optional>true</optional> </dependency> <dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-core</artifactId> - <version>1.4.0</version> + <version>1.4.1</version> + <exclusions> + <!-- Exclude ORC's Hadoop dependency and pull in Flink's shaded Hadoop. --> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Replacement for ORC's Hadoop dependency. --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-hadoop2</artifactId> + <version>${project.version}</version> + <scope>provided</scope> </dependency> <!-- test dependencies --> @@ -88,65 +105,7 @@ under the License. <scope>test</scope> <type>test-jar</type> </dependency> - </dependencies> - <build> - - <pluginManagement> - <plugins> - <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> - <plugin> - <groupId>org.eclipse.m2e</groupId> - <artifactId>lifecycle-mapping</artifactId> - <version>1.0.0</version> - <configuration> - <lifecycleMappingMetadata> - <pluginExecutions> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-assembly-plugin</artifactId> - <versionRange>[2.4,)</versionRange> - <goals> - <goal>single</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-clean-plugin</artifactId> - <versionRange>[1,)</versionRange> - <goals> - <goal>clean</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <versionRange>[1.7.7,)</versionRange> - <goals> - <goal>schema</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - </pluginExecutions> - </lifecycleMappingMetadata> - </configuration> - </plugin> - </plugins> - </pluginManagement> - </build> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java new file mode 100644 index 0000000..4353cbc --- /dev/null +++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java @@ -0,0 +1,745 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.StripeInformation; +import org.apache.orc.TypeDescription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.orc.OrcUtils.fillRows; + +/** + * InputFormat to read ORC files. + */ +public class OrcRowInputFormat extends FileInputFormat<Row> implements ResultTypeQueryable<Row> { + + private static final Logger LOG = LoggerFactory.getLogger(OrcRowInputFormat.class); + // the number of rows read in a batch + private static final int DEFAULT_BATCH_SIZE = 1000; + + // the number of fields rows to read in a batch + private int batchSize; + // the configuration to read with + private Configuration conf; + // the schema of the ORC files to read + private TypeDescription schema; + + // the fields of the ORC schema that the returned Rows are composed of. + private int[] selectedFields; + // the type information of the Rows returned by this InputFormat. + private transient RowTypeInfo rowType; + + // the ORC reader + private transient RecordReader orcRowsReader; + // the vectorized row data to be read in a batch + private transient VectorizedRowBatch rowBatch; + // the vector of rows that is read in a batch + private transient Row[] rows; + + // the number of rows in the current batch + private transient int rowsInBatch; + // the index of the next row to return + private transient int nextRow; + + private ArrayList<Predicate> conjunctPredicates = new ArrayList<>(); + + /** + * Creates an OrcRowInputFormat. + * + * @param path The path to read ORC files from. + * @param schemaString The schema of the ORC files as String. + * @param orcConfig The configuration to read the ORC files with. + */ + public OrcRowInputFormat(String path, String schemaString, Configuration orcConfig) { + this(path, TypeDescription.fromString(schemaString), orcConfig, DEFAULT_BATCH_SIZE); + } + + /** + * Creates an OrcRowInputFormat. + * + * @param path The path to read ORC files from. + * @param schemaString The schema of the ORC files as String. + * @param orcConfig The configuration to read the ORC files with. + * @param batchSize The number of Row objects to read in a batch. + */ + public OrcRowInputFormat(String path, String schemaString, Configuration orcConfig, int batchSize) { + this(path, TypeDescription.fromString(schemaString), orcConfig, batchSize); + } + + /** + * Creates an OrcRowInputFormat. + * + * @param path The path to read ORC files from. + * @param orcSchema The schema of the ORC files as ORC TypeDescription. + * @param orcConfig The configuration to read the ORC files with. + * @param batchSize The number of Row objects to read in a batch. + */ + public OrcRowInputFormat(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) { + super(new Path(path)); + + // configure OrcRowInputFormat + this.schema = orcSchema; + this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema); + this.conf = orcConfig; + this.batchSize = batchSize; + + // set default selection mask, i.e., all fields. + this.selectedFields = new int[this.schema.getChildren().size()]; + for (int i = 0; i < selectedFields.length; i++) { + this.selectedFields[i] = i; + } + } + + /** + * Adds a filter predicate to reduce the number of rows to be returned by the input format. + * Multiple conjunctive predicates can be added by calling this method multiple times. + * + * <p>Note: Predicates can significantly reduce the amount of data that is read. + * However, the OrcRowInputFormat does not guarantee that all returned rows qualify the + * predicates. Moreover, predicates are only applied if the referenced field is among the + * selected fields. + * + * @param predicate The filter predicate. + */ + public void addPredicate(Predicate predicate) { + // validate + validatePredicate(predicate); + // add predicate + this.conjunctPredicates.add(predicate); + } + + private void validatePredicate(Predicate pred) { + if (pred instanceof ColumnPredicate) { + // check column name + String colName = ((ColumnPredicate) pred).columnName; + if (!this.schema.getFieldNames().contains(colName)) { + throw new IllegalArgumentException("Predicate cannot be applied. " + + "Column '" + colName + "' does not exist in ORC schema."); + } + } else if (pred instanceof Not) { + validatePredicate(((Not) pred).child()); + } else if (pred instanceof Or) { + for (Predicate p : ((Or) pred).children()) { + validatePredicate(p); + } + } + } + + /** + * Selects the fields from the ORC schema that are returned by InputFormat. + * + * @param selectedFields The indices of the fields of the ORC schema that are returned by the InputFormat. + */ + public void selectFields(int... selectedFields) { + // set field mapping + this.selectedFields = selectedFields; + // adapt result type + this.rowType = RowTypeInfo.projectFields(this.rowType, selectedFields); + } + + /** + * Computes the ORC projection mask of the fields to include from the selected fields.rowOrcInputFormat.nextRecord(null). + * + * @return The ORC projection mask. + */ + private boolean[] computeProjectionMask() { + // mask with all fields of the schema + boolean[] projectionMask = new boolean[schema.getMaximumId() + 1]; + // for each selected field + for (int inIdx : selectedFields) { + // set all nested fields of a selected field to true + TypeDescription fieldSchema = schema.getChildren().get(inIdx); + for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) { + projectionMask[i] = true; + } + } + return projectionMask; + } + + @Override + public void openInputFormat() throws IOException { + super.openInputFormat(); + // create and initialize the row batch + this.rows = new Row[batchSize]; + for (int i = 0; i < batchSize; i++) { + rows[i] = new Row(selectedFields.length); + } + } + + @Override + public void open(FileInputSplit fileSplit) throws IOException { + + LOG.debug("Opening ORC file {}", fileSplit.getPath()); + + // open ORC file and create reader + org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(fileSplit.getPath().getPath()); + Reader orcReader = OrcFile.createReader(hPath, OrcFile.readerOptions(conf)); + + // get offset and length for the stripes that start in the split + Tuple2<Long, Long> offsetAndLength = getOffsetAndLengthForSplit(fileSplit, getStripes(orcReader)); + + // create ORC row reader configuration + Reader.Options options = getOptions(orcReader) + .schema(schema) + .range(offsetAndLength.f0, offsetAndLength.f1) + .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf)) + .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)) + .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf)); + + // configure filters + if (!conjunctPredicates.isEmpty()) { + SearchArgument.Builder b = SearchArgumentFactory.newBuilder(); + b = b.startAnd(); + for (Predicate predicate : conjunctPredicates) { + predicate.add(b); + } + b = b.end(); + options.searchArgument(b.build(), new String[]{}); + } + + // configure selected fields + options.include(computeProjectionMask()); + + // create ORC row reader + this.orcRowsReader = orcReader.rows(options); + + // assign ids + this.schema.getId(); + // create row batch + this.rowBatch = schema.createRowBatch(batchSize); + rowsInBatch = 0; + nextRow = 0; + } + + @VisibleForTesting + Reader.Options getOptions(Reader orcReader) { + return orcReader.options(); + } + + @VisibleForTesting + List<StripeInformation> getStripes(Reader orcReader) { + return orcReader.getStripes(); + } + + private Tuple2<Long, Long> getOffsetAndLengthForSplit(FileInputSplit split, List<StripeInformation> stripes) { + long splitStart = split.getStart(); + long splitEnd = splitStart + split.getLength(); + + long readStart = Long.MAX_VALUE; + long readEnd = Long.MIN_VALUE; + + for (StripeInformation s : stripes) { + if (splitStart <= s.getOffset() && s.getOffset() < splitEnd) { + // stripe starts in split, so it is included + readStart = Math.min(readStart, s.getOffset()); + readEnd = Math.max(readEnd, s.getOffset() + s.getLength()); + } + } + + if (readStart < Long.MAX_VALUE) { + // at least one split is included + return Tuple2.of(readStart, readEnd - readStart); + } else { + return Tuple2.of(0L, 0L); + } + } + + @Override + public void close() throws IOException { + if (orcRowsReader != null) { + this.orcRowsReader.close(); + } + this.orcRowsReader = null; + } + + @Override + public void closeInputFormat() throws IOException { + this.rows = null; + this.rows = null; + this.schema = null; + this.rowBatch = null; + } + + @Override + public boolean reachedEnd() throws IOException { + return !ensureBatch(); + } + + /** + * Checks if there is at least one row left in the batch to return. + * If no more row are available, it reads another batch of rows. + * + * @return Returns true if there is one more row to return, false otherwise. + * @throws IOException throw if an exception happens while reading a batch. + */ + private boolean ensureBatch() throws IOException { + + if (nextRow >= rowsInBatch) { + // No more rows available in the Rows array. + nextRow = 0; + // Try to read the next batch if rows from the ORC file. + boolean moreRows = orcRowsReader.nextBatch(rowBatch); + + if (moreRows) { + // Load the data into the Rows array. + rowsInBatch = fillRows(rows, schema, rowBatch, selectedFields); + } + return moreRows; + } + // there is at least one Row left in the Rows array. + return true; + } + + @Override + public Row nextRecord(Row reuse) throws IOException { + // return the next row + return rows[this.nextRow++]; + } + + @Override + public TypeInformation<Row> getProducedType() { + return rowType; + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeInt(batchSize); + this.conf.write(out); + out.writeUTF(schema.toString()); + + out.writeInt(selectedFields.length); + for (int f : selectedFields) { + out.writeInt(f); + } + + out.writeInt(conjunctPredicates.size()); + for (Predicate p : conjunctPredicates) { + out.writeObject(p); + } + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + batchSize = in.readInt(); + org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); + configuration.readFields(in); + + if (this.conf == null) { + this.conf = configuration; + } + this.schema = TypeDescription.fromString(in.readUTF()); + + this.selectedFields = new int[in.readInt()]; + for (int i = 0; i < selectedFields.length; i++) { + this.selectedFields[i] = in.readInt(); + } + + this.conjunctPredicates = new ArrayList<>(); + int numPreds = in.readInt(); + for (int i = 0; i < numPreds; i++) { + conjunctPredicates.add((Predicate) in.readObject()); + } + } + + // -------------------------------------------------------------------------------------------- + // Classes to define predicates + // -------------------------------------------------------------------------------------------- + + /** + * A filter predicate that can be evaluated by the OrcRowInputFormat. + */ + public abstract static class Predicate implements Serializable { + protected abstract SearchArgument.Builder add(SearchArgument.Builder builder); + } + + abstract static class ColumnPredicate extends Predicate { + final String columnName; + final PredicateLeaf.Type literalType; + + ColumnPredicate(String columnName, PredicateLeaf.Type literalType) { + this.columnName = columnName; + this.literalType = literalType; + } + + Object castLiteral(Serializable literal) { + + switch (literalType) { + case LONG: + if (literal instanceof Byte) { + return new Long((Byte) literal); + } else if (literal instanceof Short) { + return new Long((Short) literal); + } else if (literal instanceof Integer) { + return new Long((Integer) literal); + } else if (literal instanceof Long) { + return literal; + } else { + throw new IllegalArgumentException("A predicate on a LONG column requires an integer " + + "literal, i.e., Byte, Short, Integer, or Long."); + } + case FLOAT: + if (literal instanceof Float) { + return new Double((Float) literal); + } else if (literal instanceof Double) { + return literal; + } else if (literal instanceof BigDecimal) { + return ((BigDecimal) literal).doubleValue(); + } else { + throw new IllegalArgumentException("A predicate on a FLOAT column requires a floating " + + "literal, i.e., Float or Double."); + } + case STRING: + if (literal instanceof String) { + return literal; + } else { + throw new IllegalArgumentException("A predicate on a STRING column requires a floating " + + "literal, i.e., Float or Double."); + } + case BOOLEAN: + if (literal instanceof Boolean) { + return literal; + } else { + throw new IllegalArgumentException("A predicate on a BOOLEAN column requires a Boolean literal."); + } + case DATE: + if (literal instanceof Date) { + return literal; + } else { + throw new IllegalArgumentException("A predicate on a DATE column requires a java.sql.Date literal."); + } + case TIMESTAMP: + if (literal instanceof Timestamp) { + return literal; + } else { + throw new IllegalArgumentException("A predicate on a TIMESTAMP column requires a java.sql.Timestamp literal."); + } + case DECIMAL: + if (literal instanceof BigDecimal) { + return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) literal)); + } else { + throw new IllegalArgumentException("A predicate on a DECIMAL column requires a BigDecimal literal."); + } + default: + throw new IllegalArgumentException("Unknown literal type " + literalType); + } + } + } + + abstract static class BinaryPredicate extends ColumnPredicate { + final Serializable literal; + + BinaryPredicate(String columnName, PredicateLeaf.Type literalType, Serializable literal) { + super(columnName, literalType); + this.literal = literal; + } + } + + /** + * An EQUALS predicate that can be evaluated by the OrcRowInputFormat. + */ + public static class Equals extends BinaryPredicate { + /** + * Creates an EQUALS predicate. + * + * @param columnName The column to check. + * @param literalType The type of the literal. + * @param literal The literal value to check the column against. + */ + public Equals(String columnName, PredicateLeaf.Type literalType, Serializable literal) { + super(columnName, literalType, literal); + } + + @Override + protected SearchArgument.Builder add(SearchArgument.Builder builder) { + return builder.equals(columnName, literalType, castLiteral(literal)); + } + + @Override + public String toString() { + return columnName + " = " + literal; + } + } + + /** + * An EQUALS predicate that can be evaluated with Null safety by the OrcRowInputFormat. + */ + public static class NullSafeEquals extends BinaryPredicate { + /** + * Creates a null-safe EQUALS predicate. + * + * @param columnName The column to check. + * @param literalType The type of the literal. + * @param literal The literal value to check the column against. + */ + public NullSafeEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal) { + super(columnName, literalType, literal); + } + + @Override + protected SearchArgument.Builder add(SearchArgument.Builder builder) { + return builder.nullSafeEquals(columnName, literalType, castLiteral(literal)); + } + + @Override + public String toString() { + return columnName + " = " + literal; + } + } + + /** + * A LESS_THAN predicate that can be evaluated by the OrcRowInputFormat. + */ + public static class LessThan extends BinaryPredicate { + /** + * Creates a LESS_THAN predicate. + * + * @param columnName The column to check. + * @param literalType The type of the literal. + * @param literal The literal value to check the column against. + */ + public LessThan(String columnName, PredicateLeaf.Type literalType, Serializable literal) { + super(columnName, literalType, literal); + } + + @Override + protected SearchArgument.Builder add(SearchArgument.Builder builder) { + return builder.lessThan(columnName, literalType, castLiteral(literal)); + } + + @Override + public String toString() { + return columnName + " < " + literal; + } + } + + /** + * A LESS_THAN_EQUALS predicate that can be evaluated by the OrcRowInputFormat. + */ + public static class LessThanEquals extends BinaryPredicate { + /** + * Creates a LESS_THAN_EQUALS predicate. + * + * @param columnName The column to check. + * @param literalType The type of the literal. + * @param literal The literal value to check the column against. + */ + public LessThanEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal) { + super(columnName, literalType, literal); + } + + @Override + protected SearchArgument.Builder add(SearchArgument.Builder builder) { + return builder.lessThanEquals(columnName, literalType, castLiteral(literal)); + } + + @Override + public String toString() { + return columnName + " <= " + literal; + } + } + + /** + * An IS_NULL predicate that can be evaluated by the OrcRowInputFormat. + */ + public static class IsNull extends ColumnPredicate { + /** + * Creates an IS_NULL predicate. + * + * @param columnName The column to check for null. + * @param literalType The type of the column to check for null. + */ + public IsNull(String columnName, PredicateLeaf.Type literalType) { + super(columnName, literalType); + } + + @Override + protected SearchArgument.Builder add(SearchArgument.Builder builder) { + return builder.isNull(columnName, literalType); + } + + @Override + public String toString() { + return columnName + " IS NULL"; + } + } + + /** + * An BETWEEN predicate that can be evaluated by the OrcRowInputFormat. + */ + public static class Between extends ColumnPredicate { + private Serializable lowerBound; + private Serializable upperBound; + + /** + * Creates an BETWEEN predicate. + * + * @param columnName The column to check. + * @param literalType The type of the literals. + * @param lowerBound The literal value of the (inclusive) lower bound to check the column against. + * @param upperBound The literal value of the (inclusive) upper bound to check the column against. + */ + public Between(String columnName, PredicateLeaf.Type literalType, Serializable lowerBound, Serializable upperBound) { + super(columnName, literalType); + this.lowerBound = lowerBound; + this.upperBound = upperBound; + } + + @Override + protected SearchArgument.Builder add(SearchArgument.Builder builder) { + return builder.between(columnName, literalType, castLiteral(lowerBound), castLiteral(upperBound)); + } + + @Override + public String toString() { + return lowerBound + " <= " + columnName + " <= " + upperBound; + } + } + + /** + * An IN predicate that can be evaluated by the OrcRowInputFormat. + */ + public static class In extends ColumnPredicate { + private Serializable[] literals; + + /** + * Creates an IN predicate. + * + * @param columnName The column to check. + * @param literalType The type of the literals. + * @param literals The literal values to check the column against. + */ + public In(String columnName, PredicateLeaf.Type literalType, Serializable... literals) { + super(columnName, literalType); + this.literals = literals; + } + + @Override + protected SearchArgument.Builder add(SearchArgument.Builder builder) { + Object[] castedLiterals = new Object[literals.length]; + for (int i = 0; i < literals.length; i++) { + castedLiterals[i] = castLiteral(literals[i]); + } + return builder.in(columnName, literalType, (Object[]) castedLiterals); + } + + @Override + public String toString() { + return columnName + " IN " + Arrays.toString(literals); + } + } + + /** + * A NOT predicate to negate a predicate that can be evaluated by the OrcRowInputFormat. + */ + public static class Not extends Predicate { + private final Predicate pred; + + /** + * Creates a NOT predicate. + * + * @param predicate The predicate to negate. + */ + public Not(Predicate predicate) { + this.pred = predicate; + } + + protected SearchArgument.Builder add(SearchArgument.Builder builder) { + return pred.add(builder.startNot()).end(); + } + + protected Predicate child() { + return pred; + } + + @Override + public String toString() { + return "NOT(" + pred.toString() + ")"; + } + } + + /** + * An OR predicate that can be evaluated by the OrcRowInputFormat. + */ + public static class Or extends Predicate { + private final Predicate[] preds; + + /** + * Creates an OR predicate. + * + * @param predicates The disjunctive predicates. + */ + public Or(Predicate... predicates) { + this.preds = predicates; + } + + @Override + protected SearchArgument.Builder add(SearchArgument.Builder builder) { + SearchArgument.Builder withOr = builder.startOr(); + for (Predicate p : preds) { + withOr = p.add(withOr); + } + return withOr.end(); + } + + protected Iterable<Predicate> children() { + return Arrays.asList(preds); + } + + @Override + public String toString() { + return "OR(" + Arrays.toString(preds) + ")"; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java index 0454ba4..b7c5378 100644 --- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java +++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java @@ -18,111 +18,474 @@ package org.apache.flink.orc; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.orc.OrcRowInputFormat.Predicate; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.expressions.Attribute; +import org.apache.flink.table.expressions.BinaryComparison; +import org.apache.flink.table.expressions.EqualTo; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.GreaterThan; +import org.apache.flink.table.expressions.GreaterThanOrEqual; +import org.apache.flink.table.expressions.IsNotNull; +import org.apache.flink.table.expressions.IsNull; +import org.apache.flink.table.expressions.LessThan; +import org.apache.flink.table.expressions.LessThanOrEqual; +import org.apache.flink.table.expressions.Literal; +import org.apache.flink.table.expressions.Not; +import org.apache.flink.table.expressions.NotEqualTo; +import org.apache.flink.table.expressions.Or; +import org.apache.flink.table.expressions.UnaryExpression; import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.table.sources.FilterableTableSource; import org.apache.flink.table.sources.ProjectableTableSource; import org.apache.flink.table.sources.TableSource; import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.orc.TypeDescription; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + /** - * Creates a TableSource to read ORC file. + * A TableSource to read ORC files. * - * <p>The ORC file path and schema is passed during {@link OrcTableSource} construction. configuration is optional. + * <p>The {@link OrcTableSource} supports projection and filter push-down.</p> * - * <p>The OrcTableSource is used as shown in the example below. + * <p>An {@link OrcTableSource} is used as shown in the example below. * * <pre> * {@code - * String path = testInputURL.getPath(); - * String schema = "struct<col1:boolean,col2:tinyint,col3:smallint,col4:int>" - * OrcTableSource orcSrc = new OrcTableSource(path, schema); + * OrcTableSource orcSrc = OrcTableSource.builder() + * .path("file:///my/data/file.orc") + * .forOrcSchema("struct<col1:boolean,col2:tinyint,col3:smallint,col4:int>") + * .build(); + * * tEnv.registerTableSource("orcTable", orcSrc); * Table res = tableEnv.sql("SELECT * FROM orcTable"); * } * </pre> */ -public class OrcTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row> { +public class OrcTableSource + implements BatchTableSource<Row>, ProjectableTableSource<Row>, FilterableTableSource<Row> { - private String path; - private TypeDescription orcSchema; - private RowTypeInfo typeInfo; - private Configuration orcConfig; - private int[] fieldMapping; + private static final int DEFAULT_BATCH_SIZE = 1000; - /** - * The ORC file path and schema. - * - * @param path the path of orc file - * @param orcSchema schema of orc file - */ - public OrcTableSource(String path, String orcSchema) { - this(path, orcSchema, new Configuration()); - } + // path to read ORC files from + private final String path; + // schema of the ORC file + private final TypeDescription orcSchema; + // the schema of the Table + private final TableSchema tableSchema; + // the configuration to read the file + private final Configuration orcConfig; + // the number of rows to read in a batch + private final int batchSize; + + // type information of the data returned by the InputFormat + private final RowTypeInfo typeInfo; + // list of selected ORC fields to return + private final int[] selectedFields; + // list of predicates to apply + private final Predicate[] predicates; /** - * The file path and schema of orc file, and configuration to read orc file . + * Creates an OrcTableSouce from an ORC TypeDescription. * - * @param path the path of orc file - * @param orcSchema schema of orc file - * @param orcConfig configuration to read orc file + * @param path The path to read the ORC files from. + * @param orcSchema The schema of the ORC files as TypeDescription. + * @param orcConfig The configuration to read the ORC files. + * @param batchSize The number of Rows to read in a batch, default is 1000. */ - public OrcTableSource(String path, String orcSchema, Configuration orcConfig) { - this(path, TypeDescription.fromString(orcSchema), orcConfig); + private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) { + this(path, orcSchema, orcConfig, batchSize, null, null); } - public OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig) { + private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, + int batchSize, int[] selectedFields, Predicate[] predicates) { + + Preconditions.checkNotNull(path, "Path must not be null."); + Preconditions.checkNotNull(orcSchema, "OrcSchema must not be null."); + Preconditions.checkNotNull(path, "Configuration must not be null."); + Preconditions.checkArgument(batchSize > 0, "Batch size must be larger than null."); this.path = path; this.orcSchema = orcSchema; this.orcConfig = orcConfig; + this.batchSize = batchSize; + this.selectedFields = selectedFields; + this.predicates = predicates; - this.typeInfo = (RowTypeInfo) OrcUtils.schemaToTypeInfo(this.orcSchema); + // determine the type information from the ORC schema + RowTypeInfo typeInfoFromSchema = (RowTypeInfo) OrcUtils.schemaToTypeInfo(this.orcSchema); + + // set return type info + if (selectedFields == null) { + this.typeInfo = typeInfoFromSchema; + } else { + this.typeInfo = RowTypeInfo.projectFields(typeInfoFromSchema, selectedFields); + } + // create a TableSchema that corresponds to the ORC schema + this.tableSchema = new TableSchema( + typeInfoFromSchema.getFieldNames(), + typeInfoFromSchema.getFieldTypes() + ); } @Override public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { - - RowOrcInputFormat orcIF = new RowOrcInputFormat(path, orcSchema, orcConfig); - if (fieldMapping != null) { - orcIF.setFieldMapping(fieldMapping); + OrcRowInputFormat orcIF = buildOrcInputFormat(); + if (selectedFields != null) { + orcIF.selectFields(selectedFields); + } + if (predicates != null) { + for (OrcRowInputFormat.Predicate pred : predicates) { + orcIF.addPredicate(pred); + } } return execEnv.createInput(orcIF); } + @VisibleForTesting + protected OrcRowInputFormat buildOrcInputFormat() { + return new OrcRowInputFormat(path, orcSchema, orcConfig, batchSize); + } + @Override public TypeInformation<Row> getReturnType() { return typeInfo; } @Override - public TableSource<Row> projectFields(int[] fields) { + public TableSchema getTableSchema() { + return this.tableSchema; + } - OrcTableSource copy = new OrcTableSource(path, orcSchema, orcConfig); + @Override + public TableSource<Row> projectFields(int[] selectedFields) { + // create a copy of the OrcTableSouce with new selected fields + return new OrcTableSource(path, orcSchema, orcConfig, batchSize, selectedFields, predicates); + } - // set field mapping - copy.fieldMapping = fields; + @Override + public TableSource<Row> applyPredicate(List<Expression> predicates) { + ArrayList<Predicate> orcPredicates = new ArrayList<>(); - // adapt TypeInfo - TypeInformation[] fieldTypes = new TypeInformation[fields.length]; - String[] fieldNames = new String[fields.length]; - for (int i = 0; i < fields.length; i++) { - fieldTypes[i] = this.typeInfo.getTypeAt(fields[i]); - fieldNames[i] = this.typeInfo.getFieldNames()[fields[i]]; + // we do not remove any predicates from the list because ORC does not fully apply predicates + for (Expression pred : predicates) { + Predicate orcPred = toOrcPredicate(pred); + if (orcPred != null) { + orcPredicates.add(orcPred); + } } - copy.typeInfo = new RowTypeInfo(fieldTypes, fieldNames); - return copy; + return new OrcTableSource(path, orcSchema, orcConfig, batchSize, selectedFields, orcPredicates.toArray(new Predicate[]{})); + } + + @Override + public boolean isFilterPushedDown() { + return this.predicates != null; } @Override public String explainSource() { - return "ORC Source file at path " + this.path + " with schema " + this.orcSchema; + return "OrcFile[path=" + path + ", schema=" + orcSchema + ", filter=" + predicateString() + "]"; + } + + private String predicateString() { + if (predicates != null) { + return "AND(" + Arrays.toString(predicates) + ")"; + } else { + return "TRUE"; + } + } + + // Predicate conversion for filter push-down. + + private Predicate toOrcPredicate(Expression pred) { + if (pred instanceof Or) { + Predicate c1 = toOrcPredicate(((Or) pred).left()); + Predicate c2 = toOrcPredicate(((Or) pred).right()); + if (c1 == null || c2 == null) { + return null; + } else { + return new OrcRowInputFormat.Or(c1, c2); + } + } else if (pred instanceof Not) { + Predicate c = toOrcPredicate(((Not) pred).child()); + if (c == null) { + return null; + } else { + return new OrcRowInputFormat.Not(c); + } + } else if (pred instanceof BinaryComparison) { + + BinaryComparison binComp = (BinaryComparison) pred; + + if (!isValid(binComp)) { + // not a valid predicate + return null; + } + PredicateLeaf.Type litType = getLiteralType(binComp); + if (litType == null) { + // unsupported literal type + return null; + } + + boolean literalOnRight = literalOnRight(binComp); + String colName = getColumnName(binComp); + Serializable literal = (Serializable) getLiteral(binComp); + + if (pred instanceof EqualTo) { + return new OrcRowInputFormat.Equals(colName, litType, literal); + } else if (pred instanceof NotEqualTo) { + return new OrcRowInputFormat.Not( + new OrcRowInputFormat.Equals(colName, litType, literal)); + } else if (pred instanceof GreaterThan) { + if (literalOnRight) { + return new OrcRowInputFormat.Not( + new OrcRowInputFormat.LessThanEquals(colName, litType, literal)); + } else { + return new OrcRowInputFormat.LessThan(colName, litType, literal); + } + } else if (pred instanceof GreaterThanOrEqual) { + if (literalOnRight) { + return new OrcRowInputFormat.Not( + new OrcRowInputFormat.LessThan(colName, litType, literal)); + } else { + return new OrcRowInputFormat.LessThanEquals(colName, litType, literal); + } + } else if (pred instanceof LessThan) { + if (literalOnRight) { + return new OrcRowInputFormat.LessThan(colName, litType, literal); + } else { + return new OrcRowInputFormat.Not( + new OrcRowInputFormat.LessThanEquals(colName, litType, literal)); + } + } else if (pred instanceof LessThanOrEqual) { + if (literalOnRight) { + return new OrcRowInputFormat.LessThanEquals(colName, litType, literal); + } else { + return new OrcRowInputFormat.Not( + new OrcRowInputFormat.LessThan(colName, litType, literal)); + } + } else { + // unsupported predicate + return null; + } + } else if (pred instanceof UnaryExpression) { + + UnaryExpression unary = (UnaryExpression) pred; + if (!isValid(unary)) { + // not a valid predicate + return null; + } + PredicateLeaf.Type colType = toOrcType(((UnaryExpression) pred).child().resultType()); + if (colType == null) { + // unsupported type + return null; + } + + String colName = getColumnName(unary); + + if (pred instanceof IsNull) { + return new OrcRowInputFormat.IsNull(colName, colType); + } else if (pred instanceof IsNotNull) { + return new OrcRowInputFormat.Not( + new OrcRowInputFormat.IsNull(colName, colType)); + } else { + // unsupported predicate + return null; + } + } else { + // unsupported predicate + return null; + } + } + + private boolean isValid(UnaryExpression unary) { + return unary.child() instanceof Attribute; + } + + private boolean isValid(BinaryComparison comp) { + return (comp.left() instanceof Literal && comp.right() instanceof Attribute) || + (comp.left() instanceof Attribute && comp.right() instanceof Literal); + } + + private boolean literalOnRight(BinaryComparison comp) { + if (comp.left() instanceof Literal && comp.right() instanceof Attribute) { + return false; + } else if (comp.left() instanceof Attribute && comp.right() instanceof Literal) { + return true; + } else { + throw new RuntimeException("Invalid binary comparison."); + } + } + + private String getColumnName(UnaryExpression unary) { + return ((Attribute) unary.child()).name(); + } + + private String getColumnName(BinaryComparison comp) { + if (literalOnRight(comp)) { + return ((Attribute) comp.left()).name(); + } else { + return ((Attribute) comp.right()).name(); + } + } + + private PredicateLeaf.Type getLiteralType(BinaryComparison comp) { + if (literalOnRight(comp)) { + return toOrcType(((Literal) comp.right()).resultType()); + } else { + return toOrcType(((Literal) comp.left()).resultType()); + } + } + + private Object getLiteral(BinaryComparison comp) { + if (literalOnRight(comp)) { + return ((Literal) comp.right()).value(); + } else { + return ((Literal) comp.left()).value(); + } + } + + private PredicateLeaf.Type toOrcType(TypeInformation<?> type) { + if (type == BasicTypeInfo.BYTE_TYPE_INFO || + type == BasicTypeInfo.SHORT_TYPE_INFO || + type == BasicTypeInfo.INT_TYPE_INFO || + type == BasicTypeInfo.LONG_TYPE_INFO) { + return PredicateLeaf.Type.LONG; + } else if (type == BasicTypeInfo.FLOAT_TYPE_INFO || + type == BasicTypeInfo.DOUBLE_TYPE_INFO) { + return PredicateLeaf.Type.FLOAT; + } else if (type == BasicTypeInfo.BOOLEAN_TYPE_INFO) { + return PredicateLeaf.Type.BOOLEAN; + } else if (type == BasicTypeInfo.STRING_TYPE_INFO) { + return PredicateLeaf.Type.STRING; + } else if (type == SqlTimeTypeInfo.TIMESTAMP) { + return PredicateLeaf.Type.TIMESTAMP; + } else if (type == SqlTimeTypeInfo.DATE) { + return PredicateLeaf.Type.DATE; + } else if (type == BasicTypeInfo.BIG_DEC_TYPE_INFO) { + return PredicateLeaf.Type.DECIMAL; + } else { + // unsupported type + return null; + } + } + + // Builder + + public static Builder builder() { + return new Builder(); + } + + /** + * Constructs an {@link OrcTableSource}. + */ + public static class Builder { + + private String path; + + private TypeDescription schema; + + private Configuration config; + + private int batchSize = 0; + + /** + * Sets the path of the ORC file(s). + * + * @param path The path of the ORC file(s). + * @return The builder. + */ + public Builder path(String path) { + Preconditions.checkNotNull(path, "Path must not be null."); + Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty."); + this.path = path; + return this; + } + + /** + * Sets the ORC schema of the files to read as a String. + * + * @param orcSchema The ORC schema of the files to read as a String. + * @return The builder. + */ + public Builder forOrcSchema(String orcSchema) { + Preconditions.checkNotNull(orcSchema, "ORC schema must not be null."); + this.schema = TypeDescription.fromString(orcSchema); + return this; + } + + /** + * Sets the ORC schema of the files to read as a {@link TypeDescription}. + * + * @param orcSchema The ORC schema of the files to read as a String. + * @return The builder. + */ + public Builder forOrcSchema(TypeDescription orcSchema) { + Preconditions.checkNotNull(orcSchema, "ORC Schema must not be null."); + this.schema = orcSchema; + return this; + } + + /** + * Sets a Hadoop {@link Configuration} for the ORC reader. If no configuration is configured, + * an empty configuration is used. + * + * @param config The Hadoop Configuration for the ORC reader. + * @return The builder. + */ + public Builder withConfiguration(Configuration config) { + Preconditions.checkNotNull(config, "Configuration must not be null."); + this.config = config; + return this; + } + + /** + * Sets the number of rows that are read in a batch. If not configured, the ORC files are + * read with a batch size of 1000. + * + * @param batchSize The number of rows that are read in a batch. + * @return The builder. + */ + public Builder withBatchSize(int batchSize) { + Preconditions.checkArgument(batchSize > 0, "Batch size must be greater than zero."); + this.batchSize = batchSize; + return this; + } + + /** + * Builds the OrcTableSource for this builder. + * + * @return The OrcTableSource for this builder. + */ + public OrcTableSource build() { + Preconditions.checkNotNull(this.path, "Path must not be null."); + Preconditions.checkNotNull(this.schema, "ORC schema must not be null."); + if (this.config == null) { + this.config = new Configuration(); + } + if (this.batchSize == 0) { + // set default batch size + this.batchSize = DEFAULT_BATCH_SIZE; + } + return new OrcTableSource(this.path, this.schema, this.config, this.batchSize); + } + } }
