ravipesala commented on a change in pull request #3177: 
[CARBONDATA-3337][CARBONDATA-3306] Distributed index server
URL: https://github.com/apache/carbondata/pull/3177#discussion_r279622309
 
 

 ##########
 File path: 
core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
 ##########
 @@ -16,62 +16,115 @@
  */
 package org.apache.carbondata.core.datamap;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import 
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
 
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
 
 /**
  * Input format for datamaps, it makes the datamap pruning distributable.
  */
-public class DistributableDataMapFormat extends FileInputFormat<Void, 
ExtendedBlocklet> implements
-    Serializable {
+public class DistributableDataMapFormat extends FileInputFormat<Void, 
ExtendedBlocklet>
+    implements Serializable, Writable {
+
+  private static final transient Logger LOGGER =
+      
LogServiceFactory.getLogService(DistributableDataMapFormat.class.getName());
+
+  private static final long serialVersionUID = 9189779090091151248L;
 
   private CarbonTable table;
 
+  private FilterResolverIntf filterResolverIntf;
+
   private DataMapExprWrapper dataMapExprWrapper;
 
   private List<Segment> validSegments;
 
-  private List<Segment> invalidSegments;
+  private List<String> invalidSegments;
 
   private List<PartitionSpec> partitions;
 
-  private  DataMapDistributableWrapper distributable;
-
   private boolean isJobToClearDataMaps = false;
 
-  DistributableDataMapFormat(CarbonTable table, DataMapExprWrapper 
dataMapExprWrapper,
-      List<Segment> validSegments, List<Segment> invalidSegments, 
List<PartitionSpec> partitions,
-      boolean isJobToClearDataMaps) {
+  private ReadCommittedScope readCommittedScope;
+
+  private DataMapLevel dataMapLevel;
+
+  private boolean isFallbackJob = false;
+
+  DistributableDataMapFormat() {
+
+  }
+
+  DistributableDataMapFormat(CarbonTable table,
+      List<Segment> validSegments, List<String> invalidSegments, boolean 
isJobToClearDataMaps,
+      DataMapLevel dataMapLevel) throws IOException {
+    this(table, null, validSegments, invalidSegments, null,
+        isJobToClearDataMaps, dataMapLevel, false);
+  }
+
+  DistributableDataMapFormat(CarbonTable table, FilterResolverIntf 
filterResolverIntf,
+      List<Segment> validSegments, List<String> invalidSegments, 
List<PartitionSpec> partitions,
+      boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean 
isFallbackJob)
+      throws IOException {
     this.table = table;
-    this.dataMapExprWrapper = dataMapExprWrapper;
+    this.filterResolverIntf = filterResolverIntf;
     this.validSegments = validSegments;
     this.invalidSegments = invalidSegments;
     this.partitions = partitions;
+    if (!validSegments.isEmpty()) {
+      this.readCommittedScope = validSegments.get(0).getReadCommittedScope();
+    }
     this.isJobToClearDataMaps = isJobToClearDataMaps;
+    this.dataMapLevel = dataMapLevel;
+    this.isFallbackJob = isFallbackJob;
+    generateDataMapExpr();
+  }
+
+  private void generateDataMapExpr() throws IOException {
+    if (dataMapLevel == null) {
+      this.dataMapExprWrapper = DataMapChooser.getDefaultDataMap(table, 
filterResolverIntf);
 
 Review comment:
   I think there is no need to create dataMapExprWrapper here and serialize it. 
Better create in the executor side.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to