eolivelli commented on a change in pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#discussion_r615345856
##########
File path:
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
##########
@@ -97,67 +101,61 @@ public void testGenericObjectSink() throws Exception {
new SinkSpec("test-kv-sink-input-avro-" + randomName(8),
"test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class),
Pojo.builder().field1("a").field2(2).build()),
new SinkSpec("test-kv-sink-input-kv-string-int-" +
randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
Schema.KeyValue(Schema.STRING, Schema.INT32), new
KeyValue<>("foo", 123)),
- new SinkSpec("test-kv-sink-input-kv-avro-json-" +
randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
- Schema.KeyValue(Schema.AVRO(PojoKey.class),
Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(),
Pojo.builder().field1("a").field2(2).build()))
+ new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" +
randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+ Schema.KeyValue(Schema.AVRO(PojoKey.class),
Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new
KeyValue<>(PojoKey.builder().field1("a").build(),
Pojo.builder().field1("a").field2(2).build())),
+ new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" +
randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+ Schema.KeyValue(Schema.AVRO(PojoKey.class),
Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new
KeyValue<>(PojoKey.builder().field1("a").build(),
Pojo.builder().field1("a").field2(2).build()))
);
- // submit all sinks
+
+ final int numRecords = 2;
+
for (SinkSpec spec : specs) {
submitSinkConnector(spec.sinkName, spec.outputTopicName,
"org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
- }
- // check all sinks
- for (SinkSpec spec : specs) {
+
// get sink info
getSinkInfoSuccess(spec.sinkName);
getSinkStatus(spec.sinkName);
- }
- final int numRecords = 10;
-
- for (SinkSpec spec : specs) {
@Cleanup Producer<Object> producer =
client.newProducer(spec.schema)
.topic(spec.outputTopicName)
.create();
for (int i = 0; i < numRecords; i++) {
MessageId messageId = producer.newMessage()
.value(spec.testValue)
.property("expectedType",
spec.schema.getSchemaInfo().getType().toString())
+ .property("recordNumber", i + "")
.send();
log.info("sent message {} {} with ID {}", spec.testValue,
spec.schema.getSchemaInfo().getType().toString(), messageId);
}
- }
- // wait that all sinks processed all records without errors
- try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
- for (SinkSpec spec : specs) {
- try {
- log.info("waiting for sink {}", spec.sinkName);
- for (int i = 0; i < 120; i++) {
- SinkStatus status =
admin.sinks().getSinkStatus("public", "default", spec.sinkName);
- log.info("sink {} status {}", spec.sinkName, status);
- assertEquals(status.getInstances().size(), 1);
- SinkStatus.SinkInstanceStatus instance =
status.getInstances().get(0);
- if (instance.getStatus().numWrittenToSink >=
numRecords) {
- break;
- }
- assertTrue(instance.getStatus().numRestarts > 1, "Sink
was restarted, probably an error occurred");
- Thread.sleep(1000);
- }
+ // wait that all sinks processed all records without errors
+
+ try {
+ log.info("waiting for sink {}", spec.sinkName);
+ for (int i = 0; i < 120; i++) {
SinkStatus status = admin.sinks().getSinkStatus("public",
"default", spec.sinkName);
log.info("sink {} status {}", spec.sinkName, status);
assertEquals(status.getInstances().size(), 1);
-
assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >=
numRecords);
-
assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
-
assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
- log.info("sink {} is okay", spec.sinkName);
- } finally {
- dumpSinkLogs(spec);
+ SinkStatus.SinkInstanceStatus instance =
status.getInstances().get(0);
+ if (instance.getStatus().numWrittenToSink >= numRecords) {
+ break;
+ }
+ assertTrue(instance.getStatus().numRestarts > 1, "Sink was
restarted, probably an error occurred");
+ Thread.sleep(1000);
Review comment:
As soon as the record are processed we exit the loop, usually you make
only one cycle, because it is pretty immediate to receive the records on the
sink.
BTW I am thinking to rework this test and submit the sink only once,
connecting it to multiple topics
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]