nsivabalan commented on a change in pull request #2400:
URL: https://github.com/apache/hudi/pull/2400#discussion_r552072132



##########
File path: docker/demo/config/test-suite/complex-dag-cow.yaml
##########
@@ -14,41 +14,47 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 dag_name: cow-long-running-example.yaml
-dag_rounds: 2
+dag_rounds: 1

Review comment:
       ignore reviewing these two files for now:
   complex-dag-cow.yaml
   and complex-dag-mor.yaml
   I am yet to fix these. 

##########
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
##########
@@ -111,17 +111,19 @@ public void execute(ExecutionContext context) throws 
Exception {
       throw new AssertionError("Hudi contents does not match contents input 
data. ");
     }
 
-    String database = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY());
-    String tableName = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
-    log.warn("Validating hive table with db : " + database + " and table : " + 
tableName);
-    Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." + 
tableName);
-    Dataset<Row> trimmedCowDf = 
cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
-        
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
-    intersectionDf = inputSnapshotDf.intersect(trimmedDf);
-    // the intersected df should be same as inputDf. if not, there is some 
mismatch.
-    if (inputSnapshotDf.except(intersectionDf).count() != 0) {
-      log.error("Data set validation failed for COW hive table. Total count in 
hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count());
-      throw new AssertionError("Hudi hive table contents does not match 
contents input data. ");
+    if (config.isValidateHive()) {

Review comment:
       no changes except adding this if condition

##########
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
##########
@@ -41,21 +46,17 @@
   private GenericRecord lastRecord;
   // Partition path field name
   private Set<String> partitionPathFieldNames;
-  private String firstPartitionPathField;
 
   public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, 
String schema) {
     this(maxEntriesToProduce, 
GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0);
   }
 
   public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int 
minPayloadSize, String schemaStr,
-      List<String> partitionPathFieldNames, int numPartitions) {
+      List<String> partitionPathFieldNames, int partitionIndex) {

Review comment:
       all changes in this file are part of reverting 
e33a8f733c4a9a94479c166ad13ae9d53142cd3f

##########
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -127,9 +127,13 @@ public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, 
JavaSparkContext jsc, Sp
     return ws;
   }
 
+  public int getBatchId() {
+    return this.batchId;
+  }
+
   public JavaRDD<GenericRecord> generateInserts(Config operation) {
     int numPartitions = operation.getNumInsertPartitions();
-    long recordsPerPartition = operation.getNumRecordsInsert();
+    long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;

Review comment:
       these are part of reverting e33a8f733c4a9a94479c166ad13ae9d53142cd3f

##########
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -140,7 +144,7 @@ public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, 
JavaSparkContext jsc, Sp
     JavaRDD<GenericRecord> inputBatch = jsc.parallelize(partitionIndexes, 
numPartitions)
         .mapPartitionsWithIndex((index, p) -> {
           return new LazyRecordGeneratorIterator(new 
FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
-              minPayloadSize, schemaStr, partitionPathFieldNames, 
numPartitions));
+            minPayloadSize, schemaStr, partitionPathFieldNames, 
(Integer)index));

Review comment:
       these are part of reverting e33a8f733c4a9a94479c166ad13ae9d53142cd3f

##########
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
##########
@@ -46,9 +46,9 @@
  */
 public class GenericRecordFullPayloadGenerator implements Serializable {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
+  private static Logger LOG = 
LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);

Review comment:
       all changes in this file are part of reverting 
e33a8f733c4a9a94479c166ad13ae9d53142cd3f

##########
File path: 
hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
##########
@@ -134,38 +133,4 @@ public void testComplexPayloadWithLargeMinSize() throws 
Exception {
     assertTrue(HoodieAvroUtils.avroToBytes(record).length < minPayloadSize + 
0.1 * minPayloadSize);
   }
 
-  @Test

Review comment:
       these are part of reverting e33a8f733c4a9a94479c166ad13ae9d53142cd3f

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -307,7 +307,10 @@ public void refreshTimeline() throws IOException {
           if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) {
             resumeCheckpointStr = 
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
           }
-        }  else if 
(HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+        } else if (commitMetadata.getOperationType() == 
WriteOperationType.CLUSTER) {
+          // incase of CLUSTER commit, no checkpoint will be available in 
metadata.

Review comment:
       @n3nash @satishkotha : after clustering, when we call readFromSource in 
deltaStreamer, execution was going into lines 314 to 316. hence have added this 
condition to return empty checkpoint. Can you confirm this looks ok. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to