nsivabalan commented on a change in pull request #2400:
URL: https://github.com/apache/hudi/pull/2400#discussion_r552072132
##########
File path: docker/demo/config/test-suite/complex-dag-cow.yaml
##########
@@ -14,41 +14,47 @@
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: cow-long-running-example.yaml
-dag_rounds: 2
+dag_rounds: 1
Review comment:
ignore reviewing these two files for now:
complex-dag-cow.yaml
and complex-dag-mor.yaml
I am yet to fix these.
##########
File path:
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
##########
@@ -111,17 +111,19 @@ public void execute(ExecutionContext context) throws
Exception {
throw new AssertionError("Hudi contents does not match contents input
data. ");
}
- String database =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY());
- String tableName =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
- log.warn("Validating hive table with db : " + database + " and table : " +
tableName);
- Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." +
tableName);
- Dataset<Row> trimmedCowDf =
cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
-
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
- intersectionDf = inputSnapshotDf.intersect(trimmedDf);
- // the intersected df should be same as inputDf. if not, there is some
mismatch.
- if (inputSnapshotDf.except(intersectionDf).count() != 0) {
- log.error("Data set validation failed for COW hive table. Total count in
hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count());
- throw new AssertionError("Hudi hive table contents does not match
contents input data. ");
+ if (config.isValidateHive()) {
Review comment:
no changes except adding this if condition
##########
File path:
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
##########
@@ -41,21 +46,17 @@
private GenericRecord lastRecord;
// Partition path field name
private Set<String> partitionPathFieldNames;
- private String firstPartitionPathField;
public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce,
String schema) {
this(maxEntriesToProduce,
GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0);
}
public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int
minPayloadSize, String schemaStr,
- List<String> partitionPathFieldNames, int numPartitions) {
+ List<String> partitionPathFieldNames, int partitionIndex) {
Review comment:
all changes in this file are part of reverting
e33a8f733c4a9a94479c166ad13ae9d53142cd3f
##########
File path:
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -127,9 +127,13 @@ public DeltaGenerator(DFSDeltaConfig deltaOutputConfig,
JavaSparkContext jsc, Sp
return ws;
}
+ public int getBatchId() {
+ return this.batchId;
+ }
+
public JavaRDD<GenericRecord> generateInserts(Config operation) {
int numPartitions = operation.getNumInsertPartitions();
- long recordsPerPartition = operation.getNumRecordsInsert();
+ long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
Review comment:
these are part of reverting e33a8f733c4a9a94479c166ad13ae9d53142cd3f
##########
File path:
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -140,7 +144,7 @@ public DeltaGenerator(DFSDeltaConfig deltaOutputConfig,
JavaSparkContext jsc, Sp
JavaRDD<GenericRecord> inputBatch = jsc.parallelize(partitionIndexes,
numPartitions)
.mapPartitionsWithIndex((index, p) -> {
return new LazyRecordGeneratorIterator(new
FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
- minPayloadSize, schemaStr, partitionPathFieldNames,
numPartitions));
+ minPayloadSize, schemaStr, partitionPathFieldNames,
(Integer)index));
Review comment:
these are part of reverting e33a8f733c4a9a94479c166ad13ae9d53142cd3f
##########
File path:
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
##########
@@ -46,9 +46,9 @@
*/
public class GenericRecordFullPayloadGenerator implements Serializable {
- private static final Logger LOG =
LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
+ private static Logger LOG =
LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
Review comment:
all changes in this file are part of reverting
e33a8f733c4a9a94479c166ad13ae9d53142cd3f
##########
File path:
hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
##########
@@ -134,38 +133,4 @@ public void testComplexPayloadWithLargeMinSize() throws
Exception {
assertTrue(HoodieAvroUtils.avroToBytes(record).length < minPayloadSize +
0.1 * minPayloadSize);
}
- @Test
Review comment:
these are part of reverting e33a8f733c4a9a94479c166ad13ae9d53142cd3f
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -307,7 +307,10 @@ public void refreshTimeline() throws IOException {
if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) {
resumeCheckpointStr =
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
}
- } else if
(HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+ } else if (commitMetadata.getOperationType() ==
WriteOperationType.CLUSTER) {
+ // incase of CLUSTER commit, no checkpoint will be available in
metadata.
Review comment:
@n3nash @satishkotha : after clustering, when we call readFromSource in
deltaStreamer, execution was going into lines 314 to 316. hence have added this
condition to return empty checkpoint. Can you confirm this looks ok.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]