This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c48d7f8087 set reader spec config and add test (#10731)
c48d7f8087 is described below
commit c48d7f8087605ce2370186ca20e51d2354f54f1e
Author: Johan Venant <[email protected]>
AuthorDate: Sat Sep 2 07:51:23 2023 +0200
set reader spec config and add test (#10731)
---
.../SegmentGenerationAndPushTaskExecutor.java | 2 +
.../SegmentGenerationAndPushTaskGeneratorTest.java | 58 +++++++++++++++++++++-
.../src/test/resources/dummyTable.json | 8 +++
3 files changed, 67 insertions(+), 1 deletion(-)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
index af49c6e2f1..898c1a1eb1 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
@@ -287,6 +287,8 @@ public class SegmentGenerationAndPushTaskExecutor extends
BaseTaskExecutor {
recordReaderSpec.setDataFormat(taskConfigs.get(BatchConfigProperties.INPUT_FORMAT));
recordReaderSpec.setClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CLASS));
recordReaderSpec.setConfigClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
+
recordReaderSpec.setConfigs(IngestionConfigUtils.getConfigMapWithPrefix(taskConfigs,
+ BatchConfigProperties.RECORD_READER_PROP_PREFIX));
taskSpec.setRecordReaderSpec(recordReaderSpec);
String authToken = taskConfigs.get(BatchConfigProperties.AUTH_TOKEN);
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGeneratorTest.java
index 6195b54154..05d86e7ce4 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGeneratorTest.java
@@ -18,19 +18,28 @@
*/
package org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush;
+import java.net.URL;
+import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.event.DefaultMinionEventObserver;
+import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
import org.apache.pinot.spi.utils.JsonUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
/**
* Tests for {@link SegmentGenerationAndPushTaskGeneratorTest}
*/
@@ -78,4 +87,51 @@ public class SegmentGenerationAndPushTaskGeneratorTest
extends ControllerTest {
ControllerTest.sendPostRequest(_controllerRequestURLBuilder.forClusterConfigs(),
request);
Assert.assertEquals(_generator.getNumConcurrentTasksPerInstance(), 1);
}
+
+ @Test
+ public void testGenerateTaskSpec() throws Exception {
+ URL resourcesLoc =
SegmentGenerationAndPushTaskGeneratorTest.class.getClassLoader().getResource(".");
+ SegmentGenerationAndPushTaskExecutor executor = new
SegmentGenerationAndPushTaskExecutor();
+ Schema schema = new Schema.SchemaBuilder().build();
+ FieldUtils.writeField(executor, "_eventObserver", new
DefaultMinionEventObserver(), true);
+ Map<String, String> configMap = Stream.of(new String[][] {
+ {BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, resourcesLoc.toString()
+ "dummyTable.json"},
+ {BatchConfigProperties.INPUT_FORMAT, ""},
+ {BatchConfigProperties.RECORD_READER_CLASS, "AReaderClass"},
+ {BatchConfigProperties.RECORD_READER_CONFIG_CLASS, "AReaderConfigClass"},
+ {BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".prop1", "value1"},
+ {BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".prop.2", "value2"},
+ {BatchConfigProperties.AUTH_TOKEN, "not_used"},
+ {BatchConfigProperties.TABLE_NAME, "not_used"},
+ {BatchConfigProperties.SCHEMA, schema.toSingleLineJsonString()},
+ {BatchConfigProperties.SCHEMA_URI, "not_used"},
+ {BatchConfigProperties.TABLE_CONFIGS,
+ new
String(SegmentGenerationAndPushTaskGeneratorTest.class.getClassLoader()
+ .getResourceAsStream("dummyTable.json").readAllBytes())},
+ {BatchConfigProperties.TABLE_CONFIGS_URI, "not_used"},
+ {BatchConfigProperties.SEQUENCE_ID, "42"},
+ {BatchConfigProperties.FAIL_ON_EMPTY_SEGMENT, "true"},
+ {BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, "inputtext"},
+ {BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX +
".prop.seg.1", "valseg1"},
+ {BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + ".propseg2",
"valseg2"},
+ {BatchConfigProperties.APPEND_UUID_TO_SEGMENT_NAME, "true"}
+ }).collect(Collectors.toMap(data -> data[0], data -> data[1]));
+
+ SegmentGenerationTaskSpec spec = executor.generateTaskSpec(configMap,
Paths.get(resourcesLoc.toURI()).toFile());
+ Assert.assertEquals(spec.getSequenceId(), 42);
+ Assert.assertEquals("file:" + spec.getInputFilePath(),
resourcesLoc.toString() + "input/dummyTable.json");
+ Assert.assertEquals(spec.getRecordReaderSpec().getClassName(),
"AReaderClass");
+ Assert.assertEquals(spec.getRecordReaderSpec().getConfigClassName(),
"AReaderConfigClass");
+ Assert.assertEqualsDeep(spec.getRecordReaderSpec().getConfigs(),
Stream.of(new String[][] {
+ {"prop1", "value1"},
+ {"prop.2", "value2"}
+ }).collect(Collectors.toMap(data -> data[0], data -> data[1])));
+ Assert.assertEquals(spec.isFailOnEmptySegment(), true);
+ Assert.assertEquals(spec.getSegmentNameGeneratorSpec().getType(),
"inputtext");
+ Assert.assertEqualsDeep(spec.getSegmentNameGeneratorSpec().getConfigs(),
Stream.of(new String[][] {
+ {"prop.seg.1", "valseg1"},
+ {"propseg2", "valseg2"},
+ {SegmentGenerationTaskRunner.APPEND_UUID_TO_SEGMENT_NAME, "true"}
+ }).collect(Collectors.toMap(data -> data[0], data -> data[1])));
+ }
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/resources/dummyTable.json
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/resources/dummyTable.json
new file mode 100644
index 0000000000..0257ce75e2
--- /dev/null
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/resources/dummyTable.json
@@ -0,0 +1,8 @@
+{
+ "tableName": "pinotTable",
+ "tableType": "OFFLINE",
+ "segmentsConfig": {},
+ "tenants": {},
+ "tableIndexConfig": {},
+ "metadata": {}
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]