Jackie-Jiang commented on a change in pull request #7013:
URL: https://github.com/apache/incubator-pinot/pull/7013#discussion_r645160191



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
##########
@@ -40,60 +41,39 @@
  * A Collector implementation for collecting and concatenating all incoming 
rows.
  */
 public class ConcatCollector implements Collector {
-  private static final String RECORD_OFFSET_FILE_NAME = "record.offset";
-  private static final String RECORD_DATA_FILE_NAME = "record.data";
-
-  private final List<FieldSpec> _fieldSpecs = new ArrayList<>();
   private final int _numSortColumns;
   private final SortOrderComparator _sortOrderComparator;
   private final File _workingDir;
-  private final File _recordOffsetFile;
-  private final File _recordDataFile;
+  private final GenericRowFileManager _recordFileManager;
 
   private GenericRowFileWriter _recordFileWriter;
   private GenericRowFileReader _recordFileReader;
   private int _numDocs;
 
   public ConcatCollector(CollectorConfig collectorConfig, Schema schema) {
     List<String> sortOrder = collectorConfig.getSortOrder();
+    List<FieldSpec> fieldSpecs;
     if (CollectionUtils.isNotEmpty(sortOrder)) {
+      fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema, sortOrder);
       _numSortColumns = sortOrder.size();
-      DataType[] sortColumnStoredTypes = new DataType[_numSortColumns];
-      for (int i = 0; i < _numSortColumns; i++) {
-        String sortColumn = sortOrder.get(i);
-        FieldSpec fieldSpec = schema.getFieldSpecFor(sortColumn);
-        Preconditions.checkArgument(fieldSpec != null, "Failed to find sort 
column: %s", sortColumn);
-        Preconditions.checkArgument(fieldSpec.isSingleValueField(), "Cannot 
sort on MV column: %s", sortColumn);
-        sortColumnStoredTypes[i] = fieldSpec.getDataType().getStoredType();
-        _fieldSpecs.add(fieldSpec);
-      }
-      _sortOrderComparator = new SortOrderComparator(_numSortColumns, 
sortColumnStoredTypes);
-      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-        if (!fieldSpec.isVirtualColumn() && 
!sortOrder.contains(fieldSpec.getName())) {
-          _fieldSpecs.add(fieldSpec);
-        }
-      }
+      _sortOrderComparator = 
SegmentProcessingUtils.getSortOrderComparator(fieldSpecs, _numSortColumns);
     } else {
+      fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema);
       _numSortColumns = 0;
       _sortOrderComparator = null;
-      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-        if (!fieldSpec.isVirtualColumn()) {
-          _fieldSpecs.add(fieldSpec);
-        }
-      }
     }
 
     _workingDir =
         new File(FileUtils.getTempDirectory(), 
String.format("concat_collector_%d", System.currentTimeMillis()));
     Preconditions.checkState(_workingDir.mkdirs(), "Failed to create dir: %s 
for %s with config: %s",
         _workingDir.getAbsolutePath(), ConcatCollector.class.getSimpleName(), 
collectorConfig);
-    _recordOffsetFile = new File(_workingDir, RECORD_OFFSET_FILE_NAME);
-    _recordDataFile = new File(_workingDir, RECORD_DATA_FILE_NAME);
 
+    // TODO: Pass 'includeNullFields' from the config

Review comment:
       If the table config does not have null value handling enabled, we don't 
need to include null fields




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to