Repository: flink Updated Branches: refs/heads/master 6bce2b833 -> fdae3ae1f
[hotfix][kafka] Extract TransactionalIdsGenerator class from FlinkKafkaProducer011 This is pure refactor without any functional changes. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab00d35b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab00d35b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab00d35b Branch: refs/heads/master Commit: ab00d35b88ce9cf26c66b7cbb21486d1b18573a6 Parents: 6bce2b8 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Mon Nov 6 14:03:16 2017 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed Nov 8 09:45:07 2017 +0100 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaProducer011.java | 53 +++++-------- .../internal/TransactionalIdsGenerator.java | 81 ++++++++++++++++++++ 2 files changed, 99 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ab00d35b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index 873ef08..0310019 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator; import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; @@ -59,7 +60,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.serialization.ByteArraySerializer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,8 +81,6 @@ import java.util.Set; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.LongStream; -import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -183,6 +181,11 @@ public class FlinkKafkaProducer011<IN> private transient ListState<NextTransactionalIdHint> nextTransactionalIdHintState; /** + * Generator for Transactional IDs. + */ + private transient TransactionalIdsGenerator transactionalIdsGenerator; + + /** * Hint for picking next transactional id. */ private transient NextTransactionalIdHint nextTransactionalIdHint; @@ -785,6 +788,11 @@ public class FlinkKafkaProducer011<IN> nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState( NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); + transactionalIdsGenerator = new TransactionalIdsGenerator( + getRuntimeContext().getTaskName(), + getRuntimeContext().getIndexOfThisSubtask(), + kafkaProducersPoolSize, + SAFE_SCALE_DOWN_FACTOR); if (semantic != Semantic.EXACTLY_ONCE) { nextTransactionalIdHint = null; @@ -803,15 +811,8 @@ public class FlinkKafkaProducer011<IN> // (1) the first execution of this application // (2) previous execution has failed before first checkpoint completed // - // in case of (2) we have to abort all previous transactions, but we don't know was the parallelism used - // then, so we must guess using current configured pool size, current parallelism and - // SAFE_SCALE_DOWN_FACTOR - long abortTransactionalIdStart = getRuntimeContext().getIndexOfThisSubtask(); - long abortTransactionalIdEnd = abortTransactionalIdStart + 1; - - abortTransactionalIdStart *= kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR; - abortTransactionalIdEnd *= kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR; - abortTransactions(LongStream.range(abortTransactionalIdStart, abortTransactionalIdEnd)); + // in case of (2) we have to abort all previous transactions + abortTransactions(transactionalIdsGenerator.generateIdsToAbort()); } else { nextTransactionalIdHint = transactionalIdHints.get(0); } @@ -834,16 +835,7 @@ public class FlinkKafkaProducer011<IN> private Set<String> generateNewTransactionalIds() { checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be present for EXACTLY_ONCE"); - // range of available transactional ids is: - // [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * kafkaProducersPoolSize) - // loop below picks in a deterministic way a subrange of those available transactional ids based on index of - // this subtask - int subtaskId = getRuntimeContext().getIndexOfThisSubtask(); - Set<String> transactionalIds = new HashSet<>(); - for (int i = 0; i < kafkaProducersPoolSize; i++) { - long transactionalId = nextTransactionalIdHint.nextFreeTransactionalId + subtaskId * kafkaProducersPoolSize + i; - transactionalIds.add(generateTransactionalId(transactionalId)); - } + Set<String> transactionalIds = transactionalIdsGenerator.generateIdsToUse(nextTransactionalIdHint.nextFreeTransactionalId); LOG.info("Generated new transactionalIds {}", transactionalIds); return transactionalIds; } @@ -862,7 +854,7 @@ public class FlinkKafkaProducer011<IN> if (!getUserContext().isPresent()) { return; } - abortTransactions(getUserContext().get().transactionalIds.stream()); + abortTransactions(getUserContext().get().transactionalIds); } private void resetAvailableTransactionalIdsPool(Collection<String> transactionalIds) { @@ -874,22 +866,13 @@ public class FlinkKafkaProducer011<IN> // ----------------------------------- Utilities -------------------------- - private void abortTransactions(LongStream transactionalIds) { - abortTransactions(transactionalIds.mapToObj(this::generateTransactionalId)); - } - - private void abortTransactions(Stream<String> transactionalIds) { - transactionalIds.forEach(transactionalId -> { + private void abortTransactions(Set<String> transactionalIds) { + for (String transactionalId : transactionalIds) { try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer = initTransactionalProducer(transactionalId, false)) { kafkaProducer.initTransactions(); } - }); - } - - private String generateTransactionalId(long transactionalId) { - String transactionalIdFormat = getRuntimeContext().getTaskName() + "-%d"; - return String.format(transactionalIdFormat, transactionalId); + } } int getTransactionCoordinatorId() { http://git-wip-us.apache.org/repos/asf/flink/blob/ab00d35b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java new file mode 100644 index 0000000..eabdad9 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Class responsible for generating transactional ids to use when communicating with Kafka. + */ +public class TransactionalIdsGenerator { + private final String prefix; + private final int subtaskIndex; + private final int poolSize; + private final int safeScaleDownFactor; + + public TransactionalIdsGenerator( + String prefix, + int subtaskIndex, + int poolSize, + int safeScaleDownFactor) { + this.prefix = checkNotNull(prefix); + this.subtaskIndex = subtaskIndex; + this.poolSize = poolSize; + this.safeScaleDownFactor = safeScaleDownFactor; + } + + /** + * Range of available transactional ids to use is: + * [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * kafkaProducersPoolSize) + * loop below picks in a deterministic way a subrange of those available transactional ids based on index of + * this subtask. + */ + public Set<String> generateIdsToUse(long nextFreeTransactionalId) { + Set<String> transactionalIds = new HashSet<>(); + for (int i = 0; i < poolSize; i++) { + long transactionalId = nextFreeTransactionalId + subtaskIndex * poolSize + i; + transactionalIds.add(generateTransactionalId(transactionalId)); + } + return transactionalIds; + } + + /** + * If we have to abort previous transactional id in case of restart after a failure BEFORE first checkpoint + * completed, we don't know what was the parallelism used in previous attempt. In that case we must guess the ids + * range to abort based on current configured pool size, current parallelism and safeScaleDownFactor. + */ + public Set<String> generateIdsToAbort() { + long abortTransactionalIdStart = subtaskIndex; + long abortTransactionalIdEnd = abortTransactionalIdStart + 1; + + abortTransactionalIdStart *= poolSize * safeScaleDownFactor; + abortTransactionalIdEnd *= poolSize * safeScaleDownFactor; + return LongStream.range(abortTransactionalIdStart, abortTransactionalIdEnd) + .mapToObj(this::generateTransactionalId) + .collect(Collectors.toSet()); + } + + private String generateTransactionalId(long transactionalId) { + return String.format(prefix + "-%d", transactionalId); + } +}