linlinnn commented on a change in pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#discussion_r615369165



##########
File path: 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
##########
@@ -97,67 +101,61 @@ public void testGenericObjectSink() throws Exception {
                 new SinkSpec("test-kv-sink-input-avro-" + randomName(8), 
"test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), 
Pojo.builder().field1("a").field2(2).build()),
                 new SinkSpec("test-kv-sink-input-kv-string-int-" + 
randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
                         Schema.KeyValue(Schema.STRING, Schema.INT32), new 
KeyValue<>("foo", 123)),
-                new SinkSpec("test-kv-sink-input-kv-avro-json-" + 
randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
-                        Schema.KeyValue(Schema.AVRO(PojoKey.class), 
Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), 
Pojo.builder().field1("a").field2(2).build()))
+                new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + 
randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), 
Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new 
KeyValue<>(PojoKey.builder().field1("a").build(), 
Pojo.builder().field1("a").field2(2).build())),
+                new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + 
randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), 
Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new 
KeyValue<>(PojoKey.builder().field1("a").build(), 
Pojo.builder().field1("a").field2(2).build()))
         );
-        // submit all sinks
+
+        final int numRecords = 2;
+
         for (SinkSpec spec : specs) {
             submitSinkConnector(spec.sinkName, spec.outputTopicName, 
"org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
-        }
-        // check all sinks
-        for (SinkSpec spec : specs) {
+
             // get sink info
             getSinkInfoSuccess(spec.sinkName);
             getSinkStatus(spec.sinkName);
-        }
 
-        final int numRecords = 10;
-
-        for (SinkSpec spec : specs) {
             @Cleanup Producer<Object> producer = 
client.newProducer(spec.schema)
                     .topic(spec.outputTopicName)
                     .create();
             for (int i = 0; i < numRecords; i++) {
                 MessageId messageId = producer.newMessage()
                         .value(spec.testValue)
                         .property("expectedType", 
spec.schema.getSchemaInfo().getType().toString())
+                        .property("recordNumber", i + "")
                         .send();
                 log.info("sent message {} {}  with ID {}", spec.testValue, 
spec.schema.getSchemaInfo().getType().toString(), messageId);
             }
-        }
 
-        // wait that all sinks processed all records without errors
-        try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
-            for (SinkSpec spec : specs) {
-                try {
-                    log.info("waiting for sink {}", spec.sinkName);
-                    for (int i = 0; i < 120; i++) {
-                        SinkStatus status = 
admin.sinks().getSinkStatus("public", "default", spec.sinkName);
-                        log.info("sink {} status {}", spec.sinkName, status);
-                        assertEquals(status.getInstances().size(), 1);
-                        SinkStatus.SinkInstanceStatus instance = 
status.getInstances().get(0);
-                        if (instance.getStatus().numWrittenToSink >= 
numRecords) {
-                            break;
-                        }
-                        assertTrue(instance.getStatus().numRestarts > 1, "Sink 
was restarted, probably an error occurred");
-                        Thread.sleep(1000);
-                    }
 
+            // wait that all sinks processed all records without errors
+
+            try {
+                log.info("waiting for sink {}", spec.sinkName);
+                for (int i = 0; i < 120; i++) {
                     SinkStatus status = admin.sinks().getSinkStatus("public", 
"default", spec.sinkName);
                     log.info("sink {} status {}", spec.sinkName, status);
                     assertEquals(status.getInstances().size(), 1);
-                    
assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= 
numRecords);
-                    
assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
-                    
assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
-                    log.info("sink {} is okay", spec.sinkName);
-                } finally {
-                    dumpSinkLogs(spec);
+                    SinkStatus.SinkInstanceStatus instance = 
status.getInstances().get(0);
+                    if (instance.getStatus().numWrittenToSink >= numRecords) {
+                        break;
+                    }
+                    assertTrue(instance.getStatus().numRestarts > 1, "Sink was 
restarted, probably an error occurred");
+                    Thread.sleep(1000);

Review comment:
       Thank, I see.
   What about moving the check that below `// wait that all sinks processed all 
records without errors` out of the loop




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to