This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 30a52215f832309cbc1913090cae56ace87feaa2 Author: Matteo Merli <[email protected]> AuthorDate: Tue Jun 30 18:39:01 2020 -0700 Avoid NPEs at ledger creation when DNS failures happen (#7403) * Avoid NPEs at ledger creation when DNS failures happen * Removed unnecessary try/catch (cherry picked from commit a230427f78e7cc844b2a22ababd05d217d9164bf) --- .gitignore | 1 + .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 +-- .../service/schema/BookkeeperSchemaStorage.java | 33 +++++++++++++--------- .../pulsar/compaction/TwoPhaseCompactor.java | 31 ++++++++++++-------- 4 files changed, 41 insertions(+), 28 deletions(-) diff --git a/.gitignore b/.gitignore index f6c7c3a..297f31d 100644 --- a/.gitignore +++ b/.gitignore @@ -87,3 +87,4 @@ docker.debug-info examples/flink/src/main/java/org/apache/flink/avro/generated pulsar-flink/src/test/java/org/apache/flink/avro/generated pulsar-client/src/test/java/org/apache/pulsar/client/avro/generated +/build/ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 66e977f..3e0d583 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2283,6 +2283,7 @@ public class ManagedCursorImpl implements ManagedCursor { void createNewMetadataLedger(final VoidCallback callback) { ledger.mbean.startCursorLedgerCreateOp(); + ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> { if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) { @@ -2349,7 +2350,6 @@ public class ManagedCursorImpl implements ManagedCursor { }); })); }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name)); - } private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) { @@ -2818,7 +2818,7 @@ public class ManagedCursorImpl implements ManagedCursor { return null; } } - + void updateReadStats(int readEntriesCount, long readEntriesSize) { this.entriesReadCount += readEntriesCount; this.entriesReadSize += readEntriesSize; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 05a9911..8b31358 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -53,6 +53,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.protocol.schema.SchemaStorage; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.LongSchemaVersion; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.protocol.schema.StoredSchema; import org.apache.pulsar.zookeeper.ZooKeeperCache; import org.apache.zookeeper.CreateMode; @@ -500,20 +501,24 @@ public class BookkeeperSchemaStorage implements SchemaStorage { private CompletableFuture<LedgerHandle> createLedger(String schemaId) { Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForSchema(schemaId); final CompletableFuture<LedgerHandle> future = new CompletableFuture<>(); - bookKeeper.asyncCreateLedger( - config.getManagedLedgerDefaultEnsembleSize(), - config.getManagedLedgerDefaultWriteQuorum(), - config.getManagedLedgerDefaultAckQuorum(), - BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()), - LedgerPassword, - (rc, handle, ctx) -> { - if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1)); - } else { - future.complete(handle); - } - }, null, metadata - ); + try { + bookKeeper.asyncCreateLedger( + config.getManagedLedgerDefaultEnsembleSize(), + config.getManagedLedgerDefaultWriteQuorum(), + config.getManagedLedgerDefaultAckQuorum(), + BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()), + LedgerPassword, + (rc, handle, ctx) -> { + if (rc != BKException.Code.OK) { + future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1)); + } else { + future.complete(handle); + } + }, null, metadata); + } catch (Throwable t) { + log.error("[{}] Encountered unexpected error when creating schema ledger", schemaId, t); + return FutureUtil.failedFuture(t); + } return future; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index df7c79b..4b7c8bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.RawBatchConverter; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -310,18 +311,24 @@ public class TwoPhaseCompactor extends Compactor { private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, Map<String,byte[]> metadata) { CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>(); - bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(), - conf.getManagedLedgerDefaultWriteQuorum(), - conf.getManagedLedgerDefaultAckQuorum(), - Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, - Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD, - (rc, ledger, ctx) -> { - if (rc != BKException.Code.OK) { - bkf.completeExceptionally(BKException.create(rc)); - } else { - bkf.complete(ledger); - } - }, null, metadata); + + try { + bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(), + conf.getManagedLedgerDefaultWriteQuorum(), + conf.getManagedLedgerDefaultAckQuorum(), + Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, + Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD, + (rc, ledger, ctx) -> { + if (rc != BKException.Code.OK) { + bkf.completeExceptionally(BKException.create(rc)); + } else { + bkf.complete(ledger); + } + }, null, metadata); + } catch (Throwable t) { + log.error("Encountered unexpected error when creating compaction ledger", t); + return FutureUtil.failedFuture(t); + } return bkf; }
