linlinnn commented on a change in pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#discussion_r615369165
##########
File path:
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
##########
@@ -97,67 +101,61 @@ public void testGenericObjectSink() throws Exception {
new SinkSpec("test-kv-sink-input-avro-" + randomName(8),
"test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class),
Pojo.builder().field1("a").field2(2).build()),
new SinkSpec("test-kv-sink-input-kv-string-int-" +
randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
Schema.KeyValue(Schema.STRING, Schema.INT32), new
KeyValue<>("foo", 123)),
- new SinkSpec("test-kv-sink-input-kv-avro-json-" +
randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
- Schema.KeyValue(Schema.AVRO(PojoKey.class),
Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(),
Pojo.builder().field1("a").field2(2).build()))
+ new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" +
randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+ Schema.KeyValue(Schema.AVRO(PojoKey.class),
Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new
KeyValue<>(PojoKey.builder().field1("a").build(),
Pojo.builder().field1("a").field2(2).build())),
+ new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" +
randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+ Schema.KeyValue(Schema.AVRO(PojoKey.class),
Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new
KeyValue<>(PojoKey.builder().field1("a").build(),
Pojo.builder().field1("a").field2(2).build()))
);
- // submit all sinks
+
+ final int numRecords = 2;
+
for (SinkSpec spec : specs) {
submitSinkConnector(spec.sinkName, spec.outputTopicName,
"org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
- }
- // check all sinks
- for (SinkSpec spec : specs) {
+
// get sink info
getSinkInfoSuccess(spec.sinkName);
getSinkStatus(spec.sinkName);
- }
- final int numRecords = 10;
-
- for (SinkSpec spec : specs) {
@Cleanup Producer<Object> producer =
client.newProducer(spec.schema)
.topic(spec.outputTopicName)
.create();
for (int i = 0; i < numRecords; i++) {
MessageId messageId = producer.newMessage()
.value(spec.testValue)
.property("expectedType",
spec.schema.getSchemaInfo().getType().toString())
+ .property("recordNumber", i + "")
.send();
log.info("sent message {} {} with ID {}", spec.testValue,
spec.schema.getSchemaInfo().getType().toString(), messageId);
}
- }
- // wait that all sinks processed all records without errors
- try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
- for (SinkSpec spec : specs) {
- try {
- log.info("waiting for sink {}", spec.sinkName);
- for (int i = 0; i < 120; i++) {
- SinkStatus status =
admin.sinks().getSinkStatus("public", "default", spec.sinkName);
- log.info("sink {} status {}", spec.sinkName, status);
- assertEquals(status.getInstances().size(), 1);
- SinkStatus.SinkInstanceStatus instance =
status.getInstances().get(0);
- if (instance.getStatus().numWrittenToSink >=
numRecords) {
- break;
- }
- assertTrue(instance.getStatus().numRestarts > 1, "Sink
was restarted, probably an error occurred");
- Thread.sleep(1000);
- }
+ // wait that all sinks processed all records without errors
+
+ try {
+ log.info("waiting for sink {}", spec.sinkName);
+ for (int i = 0; i < 120; i++) {
SinkStatus status = admin.sinks().getSinkStatus("public",
"default", spec.sinkName);
log.info("sink {} status {}", spec.sinkName, status);
assertEquals(status.getInstances().size(), 1);
-
assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >=
numRecords);
-
assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
-
assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
- log.info("sink {} is okay", spec.sinkName);
- } finally {
- dumpSinkLogs(spec);
+ SinkStatus.SinkInstanceStatus instance =
status.getInstances().get(0);
+ if (instance.getStatus().numWrittenToSink >= numRecords) {
+ break;
+ }
+ assertTrue(instance.getStatus().numRestarts > 1, "Sink was
restarted, probably an error occurred");
+ Thread.sleep(1000);
Review comment:
Thank, I see.
What about moving the check that below `// wait that all sinks processed all
records without errors` out of the loop
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]