This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c1d305b1f3c60014d9982f0c189eecf3d0facafa Author: Penghui Li <[email protected]> AuthorDate: Thu Aug 18 11:21:14 2022 +0800 [fix][broker] Fix schema does not replicate successfully (#17049) But there is a mistake that the returned schema state is incorrect. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L765-L770 Because the replicator used MessageImpl will not have the schema. And this will cause the producer to skip the schema upload. https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2147-L2149 We should remove https://github.com/apache/pulsar/blob/e826d849ceef9d6aef28569ad57950bba90dfff1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java#L766-L768 To return the correct schema state. And then we should also provide the correct schema hash. If the message is used by the replicator, the schema hash should be based on the replicator schema. Otherwise, it should use based on the schema of the message. - Fixed the incorrect returned schema state - Provide the method for getting schema hash for MessageImpl (cherry picked from commit 7689133adfd930a50c2690ecca1f2068cafa8bcb) --- build/run_unit_group.sh | 2 +- .../pulsar/broker/service/ReplicatorTest.java | 57 ++++++++++++---------- .../org/apache/pulsar/client/impl/MessageImpl.java | 12 +++-- .../apache/pulsar/client/impl/ProducerImpl.java | 9 ++-- .../client/impl/MultiTopicsConsumerImplTest.java | 2 +- .../pulsar/common/protocol/schema/SchemaHash.java | 7 ++- 6 files changed, 52 insertions(+), 37 deletions(-) diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh index 19dc713d700..a5af838e4d4 100755 --- a/build/run_unit_group.sh +++ b/build/run_unit_group.sh @@ -117,7 +117,7 @@ function other() { **/ManagedLedgerTest.java, **/TestPulsarKeyValueSchemaHandler.java, **/PrimitiveSchemaTest.java, - BlobStoreManagedLedgerOffloaderTest.java' + BlobStoreManagedLedgerOffloaderTest.java' -DtestReuseFork=false $MVN_TEST_COMMAND -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java, **/OffloadersCacheTest.java' diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index c31373c05fc..21ca713aa30 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.State; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; @@ -70,6 +71,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -78,6 +80,7 @@ import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -391,19 +394,24 @@ public class ReplicatorTest extends ReplicatorTestBase { final String subName = "my-sub"; @Cleanup - Producer<Schemas.PersonOne> producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class)) - .topic(topic.toString()) - .create(); - @Cleanup - Producer<Schemas.PersonOne> producer2 = client2.newProducer(Schema.AVRO(Schemas.PersonOne.class)) - .topic(topic.toString()) - .create(); - @Cleanup - Producer<Schemas.PersonOne> producer3 = client3.newProducer(Schema.AVRO(Schemas.PersonOne.class)) + Producer<Schemas.PersonOne> producer = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class)) .topic(topic.toString()) .create(); - List<Producer<Schemas.PersonOne>> producers = Lists.newArrayList(producer1, producer2, producer3); + admin1.topics().createSubscription(topic.toString(), subName, MessageId.earliest); + admin2.topics().createSubscription(topic.toString(), subName, MessageId.earliest); + admin3.topics().createSubscription(topic.toString(), subName, MessageId.earliest); + + + for (int i = 0; i < 10; i++) { + producer.send(new Schemas.PersonOne(i)); + } + + Awaitility.await().untilAsserted(() -> { + assertTrue(admin1.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0); + assertTrue(admin2.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0); + assertTrue(admin3.topics().getInternalStats(topic.toString()).schemaLedgers.size() > 0); + }); @Cleanup Consumer<Schemas.PersonOne> consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class)) @@ -423,8 +431,7 @@ public class ReplicatorTest extends ReplicatorTestBase { .subscriptionName(subName) .subscribe(); - for (int i = 0; i < 3; i++) { - producers.get(i).send(new Schemas.PersonOne(i)); + for (int i = 0; i < 10; i++) { Message<Schemas.PersonOne> msg1 = consumer1.receive(); Message<Schemas.PersonOne> msg2 = consumer2.receive(); Message<Schemas.PersonOne> msg3 = consumer3.receive(); @@ -1390,15 +1397,21 @@ public class ReplicatorTest extends ReplicatorTestBase { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString(), false) .getNow(null).get(); + MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get(); + Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId()); ConcurrentOpenHashMap<String, Replicator> replicators = topic.getReplicators(); PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2"); - Awaitility.await().timeout(50, TimeUnit.SECONDS) + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS) .untilAsserted(() -> assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started, replicator.getState())); assertEquals(replicator.getState(), org.apache.pulsar.broker.service.AbstractReplicator.State.Started); ManagedCursorImpl cursor = (ManagedCursorImpl) replicator.getCursor(); + + // Make sure all the data has replicated to the remote cluster before close the cursor. + Awaitility.await().untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), lastPosition)); + cursor.setState(State.Closed); Field field = ManagedCursorImpl.class.getDeclaredField("state"); @@ -1407,22 +1420,16 @@ public class ReplicatorTest extends ReplicatorTestBase { producer1.produce(10); - Position deletedPos = cursor.getMarkDeletedPosition(); - Position readPos = cursor.getReadPosition(); - - Awaitility.await().timeout(30, TimeUnit.SECONDS).until( - () -> cursor.getMarkDeletedPosition().getEntryId() != (cursor.getReadPosition().getEntryId() - 1)); - - assertNotEquals((readPos.getEntryId() - 1), deletedPos.getEntryId()); + // The cursor is closed, so the mark delete position will not move forward. + assertEquals(cursor.getMarkDeletedPosition(), lastPosition); field.set(cursor, State.Open); Awaitility.await().timeout(30, TimeUnit.SECONDS).until( - () -> cursor.getMarkDeletedPosition().getEntryId() == (cursor.getReadPosition().getEntryId() - 1)); - - deletedPos = cursor.getMarkDeletedPosition(); - readPos = cursor.getReadPosition(); - assertEquals((readPos.getEntryId() - 1), deletedPos.getEntryId()); + () -> { + log.info("++++++++++++ {}, {}", cursor.getMarkDeletedPosition(), cursor.getReadPosition()); + return cursor.getMarkDeletedPosition().getEntryId() == (cursor.getReadPosition().getEntryId() - 1); + }); } private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 2fb9311b4b5..6863a34f80f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -52,6 +52,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; +import org.apache.pulsar.common.protocol.schema.SchemaHash; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -64,6 +65,8 @@ public class MessageImpl<T> implements Message<T> { private ByteBuf payload; private Schema<T> schema; + + private SchemaHash schemaHash; private SchemaInfo schemaInfoForReplicator; private SchemaState schemaState = SchemaState.None; private Optional<EncryptionContext> encryptionCtx = Optional.empty(); @@ -91,6 +94,7 @@ public class MessageImpl<T> implements Message<T> { msg.payload = Unpooled.wrappedBuffer(payload); msg.properties = null; msg.schema = schema; + msg.schemaHash = SchemaHash.of(schema); msg.uncompressedSize = payload.remaining(); return msg; } @@ -431,9 +435,14 @@ public class MessageImpl<T> implements Message<T> { return schema.getSchemaInfo(); } + public SchemaHash getSchemaHash() { + return schemaHash == null ? SchemaHash.of(new byte[0], null) : schemaHash; + } + public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) { if (msgMetadata.hasReplicatedFrom()) { this.schemaInfoForReplicator = schemaInfo; + this.schemaHash = SchemaHash.of(schemaInfo); } else { throw new IllegalArgumentException("Only allowed to set schemaInfoForReplicator for a replicated message."); } @@ -755,9 +764,6 @@ public class MessageImpl<T> implements Message<T> { } SchemaState getSchemaState() { - if (getSchemaInfo() == null) { - return SchemaState.Ready; - } return schemaState; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 9c25f269cee..133a4eca0cd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -661,8 +661,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne completeCallbackAndReleaseSemaphore(msg.getUncompressedSize(), callback, e); return false; } - SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal()); - byte[] schemaVersion = schemaCache.get(schemaHash); + byte[] schemaVersion = schemaCache.get(msg.getSchemaHash()); if (schemaVersion != null) { msgMetadataBuilder.setSchemaVersion(schemaVersion); msg.setSchemaState(MessageImpl.SchemaState.Ready); @@ -671,8 +670,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } private boolean rePopulateMessageSchema(MessageImpl msg) { - SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal()); - byte[] schemaVersion = schemaCache.get(schemaHash); + byte[] schemaVersion = schemaCache.get(msg.getSchemaHash()); if (schemaVersion == null) { return false; } @@ -703,8 +701,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne // case, we should not cache the schema version so that the schema version of the message metadata will // be null, instead of an empty array. if (v.length != 0) { - SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal()); - schemaCache.putIfAbsent(schemaHash, v); + schemaCache.putIfAbsent(msg.getSchemaHash(), v); msg.getMessageBuilder().setSchemaVersion(v); } msg.setSchemaState(MessageImpl.SchemaState.Ready); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java index 297059b5fee..89229c49ab1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java @@ -122,7 +122,7 @@ public class MultiTopicsConsumerImplTest { // // Code under tests is using CompletableFutures. Theses may hang indefinitely if code is broken. // That's why a test timeout is defined. - @Test(timeOut = 5000) + @Test(timeOut = 10000) public void testParallelSubscribeAsync() throws Exception { String topicName = "parallel-subscribe-async-topic"; MultiTopicsConsumerImpl<byte[]> impl = createMultiTopicsConsumer(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java index 40220e6047a..8bbc18fbb70 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java @@ -54,7 +54,12 @@ public class SchemaHash { return of(schemaData.getData(), schemaData.getType()); } - private static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) { + public static SchemaHash of(SchemaInfo schemaInfo) { + return of(schemaInfo == null ? new byte[0] : schemaInfo.getSchema(), + schemaInfo == null ? null : schemaInfo.getType()); + } + + public static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) { return new SchemaHash(hashFunction.hashBytes(schemaBytes), schemaType); }
