codope commented on code in PR #8445:
URL: https://github.com/apache/hudi/pull/8445#discussion_r1202564227
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java:
##########
@@ -198,10 +200,10 @@ public void testWriteReadHFileWithMetaFields(boolean
populateMetaFields, boolean
}
}
+ @Disabled("Disable the test with evolved schema for HFile since it's not
supported")
+ @ParameterizedTest
@Override
- @Test
- public void testWriteReadWithEvolvedSchema() throws Exception {
- // Disable the test with evolved schema for HFile since it's not supported
+ public void testWriteReadWithEvolvedSchema(String evolvedSchemaPath) throws
Exception {
Review Comment:
should we just remove it for now? there's already a tracking jira.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java:
##########
@@ -114,6 +114,9 @@ public class TestSparkHoodieHBaseIndex extends
SparkClientFunctionalTestHarness
@BeforeAll
public static void init() throws Exception {
// Initialize HbaseMiniCluster
+ System.setProperty("zookeeper.preAllocSize", "100");
+ System.setProperty("zookeeper.maxCnxns", "60");
+ System.setProperty("zookeeper.4lw.commands.whitelist", "*");
Review Comment:
why do we need these configs in this PR?
##########
hudi-spark-datasource/hudi-spark3-common/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java:
##########
@@ -70,7 +71,7 @@ private static Stream<Arguments> bulkInsertTypeParams() {
@ParameterizedTest
@MethodSource("bulkInsertTypeParams")
public void testDataSourceWriter(boolean populateMetaFields) throws
Exception {
- testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP,
populateMetaFields);
+ testDataSourceWriterInternal(Collections.emptyMap(),
Collections.emptyMap(), populateMetaFields);
Review Comment:
just asking, does it make any difference? I believe both are the same thing
right?
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java:
##########
@@ -159,8 +159,8 @@ public void testAppendKafkaOffsetsSourceFormatAdapter()
throws IOException {
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(),
props, jsc()), props, jsc(), new ArrayList<>());
props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class",
ByteArrayDeserializer.class.getName());
- int numPartitions = 3;
- int numMessages = 15;
+ int numPartitions = 2;
+ int numMessages = 30;
Review Comment:
same here, why change it? is it reduce the test time (lesser partitions
lesser i/o)?
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java:
##########
@@ -308,30 +309,35 @@ public boolean upsertAndCommit(String
baseTableInstantTime, Option commitedInsta
@Test
public void testAppendKafkaOffset() {
final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";
- int numPartitions = 3;
- int numMessages = 15;
+ int numPartitions = 2;
+ int numMessages = 30;
testUtils.createTopic(topic, numPartitions);
sendMessagesToKafka(topic, numMessages, numPartitions);
TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(),
schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
- Dataset<Row> c = kafkaSource.fetchNewDataInRowFormat(Option.empty(),
Long.MAX_VALUE).getBatch().get();
- assertEquals(numMessages, c.count());
- List<String> columns =
Arrays.stream(c.columns()).collect(Collectors.toList());
+ Dataset<Row> dfNoOffsetInfo =
kafkaSource.fetchNewDataInRowFormat(Option.empty(),
Long.MAX_VALUE).getBatch().get().cache();
+ assertEquals(numMessages, dfNoOffsetInfo.count());
+ List<String> columns =
Arrays.stream(dfNoOffsetInfo.columns()).collect(Collectors.toList());
props.put(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), "true");
jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider,
metrics);
kafkaSource = new SourceFormatAdapter(jsonSource);
- Dataset<Row> d = kafkaSource.fetchNewDataInRowFormat(Option.empty(),
Long.MAX_VALUE).getBatch().get();
- assertEquals(numMessages, d.count());
+ Dataset<Row> dfWithOffsetInfo =
kafkaSource.fetchNewDataInRowFormat(Option.empty(),
Long.MAX_VALUE).getBatch().get().cache();
+ assertEquals(numMessages, dfWithOffsetInfo.count());
for (int i = 0; i < numPartitions; i++) {
- assertEquals(numMessages / numPartitions,
d.filter("_hoodie_kafka_source_partition=" + i).collectAsList().size());
+ assertEquals(numMessages / numPartitions,
dfWithOffsetInfo.filter("_hoodie_kafka_source_partition=" + i).count());
}
- assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN,
KAFKA_SOURCE_PARTITION_COLUMN,
KAFKA_SOURCE_TIMESTAMP_COLUMN).except(c).count());
- List<String> withKafkaOffsetColumns =
Arrays.stream(d.columns()).collect(Collectors.toList());
+ assertEquals(0, dfWithOffsetInfo
+ .drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN,
KAFKA_SOURCE_TIMESTAMP_COLUMN)
+ .except(dfNoOffsetInfo).count());
+ List<String> withKafkaOffsetColumns =
Arrays.stream(dfWithOffsetInfo.columns()).collect(Collectors.toList());
assertEquals(3, withKafkaOffsetColumns.size() - columns.size());
List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN,
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN);
assertEquals(appendList,
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3,
withKafkaOffsetColumns.size()));
+
+ dfNoOffsetInfo.unpersist();
+ dfWithOffsetInfo.unpersist();
Review Comment:
+1
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1946,8 +1947,8 @@ public void testJsonKafkaDFSSource() throws Exception {
@Test
public void testJsonKafkaDFSSourceWithOffsets() throws Exception {
topicName = "topic" + testNum;
- int numRecords = 15;
- int numPartitions = 3;
+ int numRecords = 30;
+ int numPartitions = 2;
Review Comment:
why change these values?
##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/ZookeeperTestService.java:
##########
@@ -163,6 +163,8 @@ private static void setupTestEnv() {
// resulting in test failure (client timeout on first session).
// set env and directly in order to handle static init/gc issues
System.setProperty("zookeeper.preAllocSize", "100");
+ System.setProperty("zookeeper.maxCnxns", "60");
+ System.setProperty("zookeeper.4lw.commands.whitelist", "*");
Review Comment:
same question as above. if it's a test issue then maybe add it in a separate
PR?
##########
hudi-integ-test/pom.xml:
##########
@@ -100,6 +98,21 @@
<scope>test</scope>
</dependency>
+ <!-- Parquet -->
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>${parquet.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${parquet.version}</version>
+ <scope>test</scope>
+ </dependency>
Review Comment:
why are these dependencies needed here? also, if we run integ tests with
hudi-spark3.x-bundle, wouldn't they already be present in classpath?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]