JingsongLi commented on a change in pull request #18:
URL: https://github.com/apache/flink-table-store/pull/18#discussion_r811545307



##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
##########
@@ -20,60 +20,132 @@
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.generated.Projection;
 import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
 import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
 import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileReader;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.utils.ProjectionUtils;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
 /** Default implementation of {@link FileStoreRead}. */
 public class FileStoreReadImpl implements FileStoreRead {
 
+    private final RowType keyType;
     private final SstFileReader.Factory sstFileReaderFactory;
     private final Comparator<RowData> keyComparator;
     private final Accumulator accumulator;
 
+    private Projection<RowData, BinaryRowData> keyProjection;

Review comment:
       store `int[][] projectedFields`, and create a separate `Projection` for  
`createReader`.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
##########
@@ -122,17 +132,56 @@ public Factory(
                 FileStorePathFactory pathFactory) {
             this.keyType = keyType;
             this.valueType = valueType;
+            this.fileFormat = fileFormat;
             this.pathFactory = pathFactory;
-            RowType recordType = KeyValue.schema(keyType, valueType);
-            this.readerFactory = fileFormat.createReaderFactory(recordType);
+
+            this.valueProjection = Projection.range(0, 
valueType.getFieldCount()).toNestedIndexes();
+            updateProjectedTypes();
+        }
+
+        public Factory withValueProjection(int[][] projection) {

Review comment:
       I think it is OK that SstFileReader supports withKeyProjection.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
##########
@@ -20,60 +20,132 @@
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.generated.Projection;
 import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
 import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
 import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileReader;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.utils.ProjectionUtils;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
 /** Default implementation of {@link FileStoreRead}. */
 public class FileStoreReadImpl implements FileStoreRead {
 
+    private final RowType keyType;
     private final SstFileReader.Factory sstFileReaderFactory;
     private final Comparator<RowData> keyComparator;
     private final Accumulator accumulator;
 
+    private Projection<RowData, BinaryRowData> keyProjection;
+
     public FileStoreReadImpl(
             RowType keyType,
             RowType valueType,
             Comparator<RowData> keyComparator,
             Accumulator accumulator,
             FileFormat fileFormat,
             FileStorePathFactory pathFactory) {
+        this.keyType = keyType;
         this.sstFileReaderFactory =
                 new SstFileReader.Factory(keyType, valueType, fileFormat, 
pathFactory);
         this.keyComparator = keyComparator;
         this.accumulator = accumulator;
+
+        this.keyProjection = null;
     }
 
     @Override
     public void withKeyProjection(int[][] projectedFields) {
-        // TODO
-        throw new UnsupportedOperationException();
+        for (int[] projectedField : projectedFields) {
+            Preconditions.checkArgument(
+                    projectedField.length == 1,
+                    "FileStoreReadImpl currently does not support nested 
projection for keys");
+        }
+        keyProjection =
+                ProjectionUtils.newProjection(
+                        keyType, Arrays.stream(projectedFields).mapToInt(f -> 
f[0]).toArray());
     }
 
     @Override
     public void withValueProjection(int[][] projectedFields) {
-        // TODO
-        throw new UnsupportedOperationException();
+        sstFileReaderFactory.withValueProjection(projectedFields);
     }
 
     @Override
     public RecordReader createReader(BinaryRowData partition, int bucket, 
List<SstFileMeta> files)
             throws IOException {
-        return new MergeTreeReader(
-                new IntervalPartition(files, keyComparator).partition(),
-                true,
-                sstFileReaderFactory.create(partition, bucket),
-                keyComparator,
-                accumulator.copy());
+        RecordReader rawReader =

Review comment:
       If there is only one file, it is possible to support key projection 
directly.
   We can have a separate branch.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
##########
@@ -122,17 +132,56 @@ public Factory(
                 FileStorePathFactory pathFactory) {
             this.keyType = keyType;
             this.valueType = valueType;
+            this.fileFormat = fileFormat;
             this.pathFactory = pathFactory;
-            RowType recordType = KeyValue.schema(keyType, valueType);
-            this.readerFactory = fileFormat.createReaderFactory(recordType);
+
+            this.valueProjection = Projection.range(0, 
valueType.getFieldCount()).toNestedIndexes();
+            updateProjectedTypes();
+        }
+
+        public Factory withValueProjection(int[][] projection) {
+            valueProjection = projection;
+            updateProjectedTypes();
+            return this;
         }
 
         public SstFileReader create(BinaryRowData partition, int bucket) {
             return new SstFileReader(
                     keyType,
-                    valueType,
+                    projectedValueType,
                     readerFactory,
                     pathFactory.createSstPathFactory(partition, bucket));
         }
+
+        private void updateProjectedTypes() {
+            int numKeyFields = keyType.getFieldCount();
+            projectedValueType = (RowType) 
Projection.of(valueProjection).project(valueType);
+
+            // see KeyValue#schema for the exact schema
+            int[][] projection = new int[numKeyFields + 2 + 
valueProjection.length][];

Review comment:
       Create a method in `int[][] KeyValue.projection(int[][] keyProjection, 
int[][] valueProjection)`

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
##########
@@ -122,17 +132,56 @@ public Factory(
                 FileStorePathFactory pathFactory) {
             this.keyType = keyType;
             this.valueType = valueType;
+            this.fileFormat = fileFormat;
             this.pathFactory = pathFactory;
-            RowType recordType = KeyValue.schema(keyType, valueType);
-            this.readerFactory = fileFormat.createReaderFactory(recordType);
+
+            this.valueProjection = Projection.range(0, 
valueType.getFieldCount()).toNestedIndexes();
+            updateProjectedTypes();
+        }
+
+        public Factory withValueProjection(int[][] projection) {
+            valueProjection = projection;
+            updateProjectedTypes();
+            return this;
         }
 
         public SstFileReader create(BinaryRowData partition, int bucket) {
             return new SstFileReader(
                     keyType,
-                    valueType,
+                    projectedValueType,
                     readerFactory,
                     pathFactory.createSstPathFactory(partition, bucket));
         }
+
+        private void updateProjectedTypes() {

Review comment:
       doProjection? Not only update types




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to