[SPARK-24882][SQL] Revert [] improve data source v2 API from branch 2.4 ## What changes were proposed in this pull request?
As discussed in the dev list, we don't want to include https://github.com/apache/spark/pull/22009 in Spark 2.4, as it needs data source v2 users to change the implementation intensitively, while they need to change again in next release. ## How was this patch tested? existing tests Author: Wenchen Fan <wenc...@databricks.com> Closes #22388 from cloud-fan/revert. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15d2e9d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15d2e9d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15d2e9d7 Branch: refs/heads/branch-2.4 Commit: 15d2e9d7d2f0d5ecefd69bdc3f8a149670b05e79 Parents: 4c1428f Author: Wenchen Fan <wenc...@databricks.com> Authored: Wed Sep 12 11:25:24 2018 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Wed Sep 12 11:25:24 2018 -0700 ---------------------------------------------------------------------- .../kafka010/KafkaContinuousReadSupport.scala | 255 ----------- .../sql/kafka010/KafkaContinuousReader.scala | 248 +++++++++++ .../kafka010/KafkaMicroBatchReadSupport.scala | 377 ---------------- .../sql/kafka010/KafkaMicroBatchReader.scala | 382 ++++++++++++++++ .../sql/kafka010/KafkaSourceProvider.scala | 37 +- .../spark/sql/kafka010/KafkaStreamWriter.scala | 118 +++++ .../kafka010/KafkaStreamingWriteSupport.scala | 118 ----- .../kafka010/KafkaContinuousSourceSuite.scala | 12 +- .../sql/kafka010/KafkaContinuousTest.scala | 8 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 35 +- .../sources/v2/BatchReadSupportProvider.java | 61 --- .../sources/v2/BatchWriteSupportProvider.java | 59 --- .../sql/sources/v2/ContinuousReadSupport.java | 46 ++ .../v2/ContinuousReadSupportProvider.java | 70 --- .../spark/sql/sources/v2/DataSourceV2.java | 10 +- .../sql/sources/v2/MicroBatchReadSupport.java | 52 +++ .../v2/MicroBatchReadSupportProvider.java | 70 --- .../spark/sql/sources/v2/ReadSupport.java | 65 +++ .../sql/sources/v2/SessionConfigSupport.java | 7 +- .../sql/sources/v2/StreamWriteSupport.java | 52 +++ .../v2/StreamingWriteSupportProvider.java | 54 --- .../spark/sql/sources/v2/WriteSupport.java | 53 +++ .../sql/sources/v2/reader/BatchReadSupport.java | 51 --- .../v2/reader/ContinuousInputPartition.java | 35 ++ .../sql/sources/v2/reader/DataSourceReader.java | 75 ++++ .../sql/sources/v2/reader/InputPartition.java | 26 +- .../sources/v2/reader/InputPartitionReader.java | 53 +++ .../sql/sources/v2/reader/PartitionReader.java | 49 --- .../v2/reader/PartitionReaderFactory.java | 66 --- .../sql/sources/v2/reader/ReadSupport.java | 50 --- .../spark/sql/sources/v2/reader/ScanConfig.java | 45 -- .../sources/v2/reader/ScanConfigBuilder.java | 30 -- .../spark/sql/sources/v2/reader/Statistics.java | 2 +- .../v2/reader/SupportsPushDownFilters.java | 6 +- .../reader/SupportsPushDownRequiredColumns.java | 8 +- .../v2/reader/SupportsReportPartitioning.java | 12 +- .../v2/reader/SupportsReportStatistics.java | 14 +- .../v2/reader/SupportsScanColumnarBatch.java | 53 +++ .../partitioning/ClusteredDistribution.java | 4 +- .../v2/reader/partitioning/Distribution.java | 6 +- .../v2/reader/partitioning/Partitioning.java | 5 +- .../ContinuousInputPartitionReader.java | 36 ++ .../streaming/ContinuousPartitionReader.java | 37 -- .../ContinuousPartitionReaderFactory.java | 40 -- .../reader/streaming/ContinuousReadSupport.java | 77 ---- .../v2/reader/streaming/ContinuousReader.java | 79 ++++ .../reader/streaming/MicroBatchReadSupport.java | 60 --- .../v2/reader/streaming/MicroBatchReader.java | 75 ++++ .../sql/sources/v2/reader/streaming/Offset.java | 4 +- .../reader/streaming/StreamingReadSupport.java | 49 --- .../sources/v2/writer/BatchWriteSupport.java | 101 ----- .../sql/sources/v2/writer/DataSourceWriter.java | 116 +++++ .../spark/sql/sources/v2/writer/DataWriter.java | 16 +- .../sources/v2/writer/DataWriterFactory.java | 23 +- .../sources/v2/writer/WriterCommitMessage.java | 9 +- .../v2/writer/streaming/StreamWriter.java | 71 +++ .../streaming/StreamingDataWriterFactory.java | 59 --- .../writer/streaming/StreamingWriteSupport.java | 71 --- .../org/apache/spark/sql/DataFrameReader.scala | 4 +- .../org/apache/spark/sql/DataFrameWriter.scala | 8 +- .../datasources/v2/DataSourceRDD.scala | 44 +- .../datasources/v2/DataSourceV2Relation.scala | 72 ++- .../datasources/v2/DataSourceV2ScanExec.scala | 65 +-- .../datasources/v2/DataSourceV2Strategy.scala | 49 +-- .../datasources/v2/DataSourceV2Utils.scala | 9 - .../v2/WriteToDataSourceV2Exec.scala | 40 +- .../streaming/MicroBatchExecution.scala | 91 ++-- .../execution/streaming/ProgressReporter.scala | 8 +- .../SimpleStreamingScanConfigBuilder.scala | 40 -- .../execution/streaming/StreamingRelation.scala | 6 +- .../spark/sql/execution/streaming/console.scala | 14 +- .../continuous/ContinuousDataSourceRDD.scala | 37 +- .../continuous/ContinuousExecution.scala | 51 +-- .../continuous/ContinuousQueuedDataReader.scala | 29 +- .../continuous/ContinuousRateStreamSource.scala | 60 +-- .../continuous/ContinuousTextSocketSource.scala | 72 ++- .../continuous/ContinuousWriteRDD.scala | 7 +- .../streaming/continuous/EpochCoordinator.scala | 18 +- .../WriteToContinuousDataSource.scala | 4 +- .../WriteToContinuousDataSourceExec.scala | 10 +- .../spark/sql/execution/streaming/memory.scala | 51 ++- .../streaming/sources/ConsoleWriteSupport.scala | 71 --- .../streaming/sources/ConsoleWriter.scala | 72 +++ .../sources/ContinuousMemoryStream.scala | 76 ++-- .../sources/ForeachWriteSupportProvider.scala | 140 ------ .../sources/ForeachWriterProvider.scala | 139 ++++++ .../sources/MicroBatchWritSupport.scala | 51 --- .../streaming/sources/MicroBatchWriter.scala | 37 ++ .../sources/PackedRowWriterFactory.scala | 9 +- .../RateControlMicroBatchReadSupport.scala | 31 -- .../RateStreamMicroBatchReadSupport.scala | 215 --------- .../sources/RateStreamMicroBatchReader.scala | 220 ++++++++++ .../streaming/sources/RateStreamProvider.scala | 27 +- .../execution/streaming/sources/memoryV2.scala | 35 +- .../execution/streaming/sources/socket.scala | 114 ++--- .../spark/sql/streaming/DataStreamReader.scala | 52 +-- .../spark/sql/streaming/DataStreamWriter.scala | 9 +- .../sql/streaming/StreamingQueryManager.scala | 4 +- .../sources/v2/JavaAdvancedDataSourceV2.java | 147 +++---- .../sql/sources/v2/JavaBatchDataSourceV2.java | 114 +++++ .../sources/v2/JavaColumnarDataSourceV2.java | 114 ----- .../v2/JavaPartitionAwareDataSource.java | 81 ++-- .../v2/JavaSchemaRequiredDataSource.java | 26 +- .../sql/sources/v2/JavaSimpleDataSourceV2.java | 68 ++- .../sql/sources/v2/JavaSimpleReadSupport.java | 99 ----- ....apache.spark.sql.sources.DataSourceRegister | 4 +- .../execution/streaming/MemorySinkV2Suite.scala | 2 +- .../sources/ConsoleWriteSupportSuite.scala | 151 ------- .../streaming/sources/ConsoleWriterSuite.scala | 153 +++++++ .../sources/RateStreamProviderSuite.scala | 84 ++-- .../sources/TextSocketStreamSuite.scala | 81 ++-- .../sql/sources/v2/DataSourceV2Suite.scala | 438 +++++++++---------- .../sources/v2/SimpleWritableDataSource.scala | 110 +++-- .../apache/spark/sql/streaming/StreamTest.scala | 2 +- .../streaming/StreamingQueryListenerSuite.scala | 4 +- .../sql/streaming/StreamingQuerySuite.scala | 58 ++- .../ContinuousQueuedDataReaderSuite.scala | 45 +- .../streaming/continuous/ContinuousSuite.scala | 2 +- .../continuous/EpochCoordinatorSuite.scala | 18 +- .../sources/StreamingDataSourceV2Suite.scala | 95 ++-- 120 files changed, 3619 insertions(+), 4070 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala deleted file mode 100644 index 1753a28..0000000 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import java.{util => ju} -import java.util.concurrent.TimeoutException - -import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException} -import org.apache.kafka.common.TopicPartition - -import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} -import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming._ -import org.apache.spark.sql.types.StructType - -/** - * A [[ContinuousReadSupport]] for data from kafka. - * - * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be - * read by per-task consumers generated later. - * @param kafkaParams String params for per-task Kafka consumers. - * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which - * are not Kafka consumer params. - * @param metadataPath Path to a directory this reader can use for writing metadata. - * @param initialOffsets The Kafka offsets to start reading data at. - * @param failOnDataLoss Flag indicating whether reading should fail in data loss - * scenarios, where some offsets after the specified initial ones can't be - * properly read. - */ -class KafkaContinuousReadSupport( - offsetReader: KafkaOffsetReader, - kafkaParams: ju.Map[String, Object], - sourceOptions: Map[String, String], - metadataPath: String, - initialOffsets: KafkaOffsetRangeLimit, - failOnDataLoss: Boolean) - extends ContinuousReadSupport with Logging { - - private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong - - override def initialOffset(): Offset = { - val offsets = initialOffsets match { - case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) - case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets()) - case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss) - } - logInfo(s"Initial offsets: $offsets") - offsets - } - - override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema - - override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = { - new KafkaContinuousScanConfigBuilder(fullSchema(), start, offsetReader, reportDataLoss) - } - - override def deserializeOffset(json: String): Offset = { - KafkaSourceOffset(JsonUtils.partitionOffsets(json)) - } - - override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { - val startOffsets = config.asInstanceOf[KafkaContinuousScanConfig].startOffsets - startOffsets.toSeq.map { - case (topicPartition, start) => - KafkaContinuousInputPartition( - topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss) - }.toArray - } - - override def createContinuousReaderFactory( - config: ScanConfig): ContinuousPartitionReaderFactory = { - KafkaContinuousReaderFactory - } - - /** Stop this source and free any resources it has allocated. */ - def stop(): Unit = synchronized { - offsetReader.close() - } - - override def commit(end: Offset): Unit = {} - - override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { - val mergedMap = offsets.map { - case KafkaSourcePartitionOffset(p, o) => Map(p -> o) - }.reduce(_ ++ _) - KafkaSourceOffset(mergedMap) - } - - override def needsReconfiguration(config: ScanConfig): Boolean = { - val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions - offsetReader.fetchLatestOffsets().keySet != knownPartitions - } - - override def toString(): String = s"KafkaSource[$offsetReader]" - - /** - * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. - * Otherwise, just log a warning. - */ - private def reportDataLoss(message: String): Unit = { - if (failOnDataLoss) { - throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") - } else { - logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") - } - } -} - -/** - * An input partition for continuous Kafka processing. This will be serialized and transformed - * into a full reader on executors. - * - * @param topicPartition The (topic, partition) pair this task is responsible for. - * @param startOffset The offset to start reading from within the partition. - * @param kafkaParams Kafka consumer params to use. - * @param pollTimeoutMs The timeout for Kafka consumer polling. - * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets - * are skipped. - */ -case class KafkaContinuousInputPartition( - topicPartition: TopicPartition, - startOffset: Long, - kafkaParams: ju.Map[String, Object], - pollTimeoutMs: Long, - failOnDataLoss: Boolean) extends InputPartition - -object KafkaContinuousReaderFactory extends ContinuousPartitionReaderFactory { - override def createReader(partition: InputPartition): ContinuousPartitionReader[InternalRow] = { - val p = partition.asInstanceOf[KafkaContinuousInputPartition] - new KafkaContinuousPartitionReader( - p.topicPartition, p.startOffset, p.kafkaParams, p.pollTimeoutMs, p.failOnDataLoss) - } -} - -class KafkaContinuousScanConfigBuilder( - schema: StructType, - startOffset: Offset, - offsetReader: KafkaOffsetReader, - reportDataLoss: String => Unit) - extends ScanConfigBuilder { - - override def build(): ScanConfig = { - val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(startOffset) - - val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet - val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet) - val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq) - - val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet) - if (deletedPartitions.nonEmpty) { - reportDataLoss(s"Some partitions were deleted: $deletedPartitions") - } - - val startOffsets = newPartitionOffsets ++ - oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_)) - KafkaContinuousScanConfig(schema, startOffsets) - } -} - -case class KafkaContinuousScanConfig( - readSchema: StructType, - startOffsets: Map[TopicPartition, Long]) - extends ScanConfig { - - // Created when building the scan config builder. If this diverges from the partitions at the - // latest offsets, we need to reconfigure the kafka read support. - def knownPartitions: Set[TopicPartition] = startOffsets.keySet -} - -/** - * A per-task data reader for continuous Kafka processing. - * - * @param topicPartition The (topic, partition) pair this data reader is responsible for. - * @param startOffset The offset to start reading from within the partition. - * @param kafkaParams Kafka consumer params to use. - * @param pollTimeoutMs The timeout for Kafka consumer polling. - * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets - * are skipped. - */ -class KafkaContinuousPartitionReader( - topicPartition: TopicPartition, - startOffset: Long, - kafkaParams: ju.Map[String, Object], - pollTimeoutMs: Long, - failOnDataLoss: Boolean) extends ContinuousPartitionReader[InternalRow] { - private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false) - private val converter = new KafkaRecordToUnsafeRowConverter - - private var nextKafkaOffset = startOffset - private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _ - - override def next(): Boolean = { - var r: ConsumerRecord[Array[Byte], Array[Byte]] = null - while (r == null) { - if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) return false - // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving - // interrupt points to end the query rather than waiting for new data that might never come. - try { - r = consumer.get( - nextKafkaOffset, - untilOffset = Long.MaxValue, - pollTimeoutMs, - failOnDataLoss) - } catch { - // We didn't read within the timeout. We're supposed to block indefinitely for new data, so - // swallow and ignore this. - case _: TimeoutException | _: org.apache.kafka.common.errors.TimeoutException => - - // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range, - // or if it's the endpoint of the data range (i.e. the "true" next offset). - case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] => - val range = consumer.getAvailableOffsetRange() - if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) { - // retry - } else { - throw e - } - } - } - nextKafkaOffset = r.offset + 1 - currentRecord = r - true - } - - override def get(): UnsafeRow = { - converter.toUnsafeRow(currentRecord) - } - - override def getOffset(): KafkaSourcePartitionOffset = { - KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset) - } - - override def close(): Unit = { - consumer.release() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala new file mode 100644 index 0000000..8ce56a2 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.TimeoutException + +import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException} +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType + +/** + * A [[ContinuousReader]] for data from kafka. + * + * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be + * read by per-task consumers generated later. + * @param kafkaParams String params for per-task Kafka consumers. + * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which + * are not Kafka consumer params. + * @param metadataPath Path to a directory this reader can use for writing metadata. + * @param initialOffsets The Kafka offsets to start reading data at. + * @param failOnDataLoss Flag indicating whether reading should fail in data loss + * scenarios, where some offsets after the specified initial ones can't be + * properly read. + */ +class KafkaContinuousReader( + offsetReader: KafkaOffsetReader, + kafkaParams: ju.Map[String, Object], + sourceOptions: Map[String, String], + metadataPath: String, + initialOffsets: KafkaOffsetRangeLimit, + failOnDataLoss: Boolean) + extends ContinuousReader with Logging { + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong + + // Initialized when creating reader factories. If this diverges from the partitions at the latest + // offsets, we need to reconfigure. + // Exposed outside this object only for unit tests. + @volatile private[sql] var knownPartitions: Set[TopicPartition] = _ + + override def readSchema: StructType = KafkaOffsetReader.kafkaSchema + + private var offset: Offset = _ + override def setStartOffset(start: ju.Optional[Offset]): Unit = { + offset = start.orElse { + val offsets = initialOffsets match { + case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) + case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets()) + case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss) + } + logInfo(s"Initial offsets: $offsets") + offsets + } + } + + override def getStartOffset(): Offset = offset + + override def deserializeOffset(json: String): Offset = { + KafkaSourceOffset(JsonUtils.partitionOffsets(json)) + } + + override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = { + import scala.collection.JavaConverters._ + + val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset) + + val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet + val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet) + val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq) + + val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet) + if (deletedPartitions.nonEmpty) { + reportDataLoss(s"Some partitions were deleted: $deletedPartitions") + } + + val startOffsets = newPartitionOffsets ++ + oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_)) + knownPartitions = startOffsets.keySet + + startOffsets.toSeq.map { + case (topicPartition, start) => + KafkaContinuousInputPartition( + topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss + ): InputPartition[InternalRow] + }.asJava + } + + /** Stop this source and free any resources it has allocated. */ + def stop(): Unit = synchronized { + offsetReader.close() + } + + override def commit(end: Offset): Unit = {} + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { + val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) + }.reduce(_ ++ _) + KafkaSourceOffset(mergedMap) + } + + override def needsReconfiguration(): Boolean = { + knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != knownPartitions + } + + override def toString(): String = s"KafkaSource[$offsetReader]" + + /** + * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. + * Otherwise, just log a warning. + */ + private def reportDataLoss(message: String): Unit = { + if (failOnDataLoss) { + throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") + } else { + logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") + } + } +} + +/** + * An input partition for continuous Kafka processing. This will be serialized and transformed + * into a full reader on executors. + * + * @param topicPartition The (topic, partition) pair this task is responsible for. + * @param startOffset The offset to start reading from within the partition. + * @param kafkaParams Kafka consumer params to use. + * @param pollTimeoutMs The timeout for Kafka consumer polling. + * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets + * are skipped. + */ +case class KafkaContinuousInputPartition( + topicPartition: TopicPartition, + startOffset: Long, + kafkaParams: ju.Map[String, Object], + pollTimeoutMs: Long, + failOnDataLoss: Boolean) extends ContinuousInputPartition[InternalRow] { + + override def createContinuousReader( + offset: PartitionOffset): InputPartitionReader[InternalRow] = { + val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset] + require(kafkaOffset.topicPartition == topicPartition, + s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}") + new KafkaContinuousInputPartitionReader( + topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss) + } + + override def createPartitionReader(): KafkaContinuousInputPartitionReader = { + new KafkaContinuousInputPartitionReader( + topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss) + } +} + +/** + * A per-task data reader for continuous Kafka processing. + * + * @param topicPartition The (topic, partition) pair this data reader is responsible for. + * @param startOffset The offset to start reading from within the partition. + * @param kafkaParams Kafka consumer params to use. + * @param pollTimeoutMs The timeout for Kafka consumer polling. + * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets + * are skipped. + */ +class KafkaContinuousInputPartitionReader( + topicPartition: TopicPartition, + startOffset: Long, + kafkaParams: ju.Map[String, Object], + pollTimeoutMs: Long, + failOnDataLoss: Boolean) extends ContinuousInputPartitionReader[InternalRow] { + private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false) + private val converter = new KafkaRecordToUnsafeRowConverter + + private var nextKafkaOffset = startOffset + private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _ + + override def next(): Boolean = { + var r: ConsumerRecord[Array[Byte], Array[Byte]] = null + while (r == null) { + if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) return false + // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving + // interrupt points to end the query rather than waiting for new data that might never come. + try { + r = consumer.get( + nextKafkaOffset, + untilOffset = Long.MaxValue, + pollTimeoutMs, + failOnDataLoss) + } catch { + // We didn't read within the timeout. We're supposed to block indefinitely for new data, so + // swallow and ignore this. + case _: TimeoutException | _: org.apache.kafka.common.errors.TimeoutException => + + // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range, + // or if it's the endpoint of the data range (i.e. the "true" next offset). + case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] => + val range = consumer.getAvailableOffsetRange() + if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) { + // retry + } else { + throw e + } + } + } + nextKafkaOffset = r.offset + 1 + currentRecord = r + true + } + + override def get(): UnsafeRow = { + converter.toUnsafeRow(currentRecord) + } + + override def getOffset(): KafkaSourcePartitionOffset = { + KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset) + } + + override def close(): Unit = { + consumer.release() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala deleted file mode 100644 index bb4de67..0000000 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import java.{util => ju} -import java.io._ -import java.nio.charset.StandardCharsets - -import org.apache.commons.io.IOUtils - -import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.ExecutorCacheTaskLocation -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder} -import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport -import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} -import org.apache.spark.sql.sources.v2.DataSourceOptions -import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset} -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.UninterruptibleThread - -/** - * A [[MicroBatchReadSupport]] that reads data from Kafka. - * - * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains - * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For - * example if the last record in a Kafka topic "t", partition 2 is offset 5, then - * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent - * with the semantics of `KafkaConsumer.position()`. - * - * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user - * must make sure all messages in a topic have been processed when deleting a topic. - * - * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped. - * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers - * and not use wrong broker addresses. - */ -private[kafka010] class KafkaMicroBatchReadSupport( - kafkaOffsetReader: KafkaOffsetReader, - executorKafkaParams: ju.Map[String, Object], - options: DataSourceOptions, - metadataPath: String, - startingOffsets: KafkaOffsetRangeLimit, - failOnDataLoss: Boolean) extends RateControlMicroBatchReadSupport with Logging { - - private val pollTimeoutMs = options.getLong( - "kafkaConsumer.pollTimeoutMs", - SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L) - - private val maxOffsetsPerTrigger = - Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) - - private val rangeCalculator = KafkaOffsetRangeCalculator(options) - - private var endPartitionOffsets: KafkaSourceOffset = _ - - /** - * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only - * called in StreamExecutionThread. Otherwise, interrupting a thread while running - * `KafkaConsumer.poll` may hang forever (KAFKA-1894). - */ - override def initialOffset(): Offset = { - KafkaSourceOffset(getOrCreateInitialPartitionOffsets()) - } - - override def latestOffset(start: Offset): Offset = { - val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets - val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets() - endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets => - rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets) - }.getOrElse { - latestPartitionOffsets - }) - endPartitionOffsets - } - - override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema - - override def newScanConfigBuilder(start: Offset, end: Offset): ScanConfigBuilder = { - new SimpleStreamingScanConfigBuilder(fullSchema(), start, Some(end)) - } - - override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { - val sc = config.asInstanceOf[SimpleStreamingScanConfig] - val startPartitionOffsets = sc.start.asInstanceOf[KafkaSourceOffset].partitionToOffsets - val endPartitionOffsets = sc.end.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets - - // Find the new partitions, and get their earliest offsets - val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet) - val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq) - if (newPartitionInitialOffsets.keySet != newPartitions) { - // We cannot get from offsets for some partitions. It means they got deleted. - val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) - reportDataLoss( - s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed") - } - logInfo(s"Partitions added: $newPartitionInitialOffsets") - newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => - reportDataLoss( - s"Added partition $p starts from $o instead of 0. Some data may have been missed") - } - - // Find deleted partitions, and report data loss if required - val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet) - if (deletedPartitions.nonEmpty) { - reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed") - } - - // Use the end partitions to calculate offset ranges to ignore partitions that have - // been deleted - val topicPartitions = endPartitionOffsets.keySet.filter { tp => - // Ignore partitions that we don't know the from offsets. - newPartitionInitialOffsets.contains(tp) || startPartitionOffsets.contains(tp) - }.toSeq - logDebug("TopicPartitions: " + topicPartitions.mkString(", ")) - - // Calculate offset ranges - val offsetRanges = rangeCalculator.getRanges( - fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets, - untilOffsets = endPartitionOffsets, - executorLocations = getSortedExecutorList()) - - // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions, - // that is, concurrent tasks will not read the same TopicPartitions. - val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size - - // Generate factories based on the offset ranges - offsetRanges.map { range => - KafkaMicroBatchInputPartition( - range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer) - }.toArray - } - - override def createReaderFactory(config: ScanConfig): PartitionReaderFactory = { - KafkaMicroBatchReaderFactory - } - - override def deserializeOffset(json: String): Offset = { - KafkaSourceOffset(JsonUtils.partitionOffsets(json)) - } - - override def commit(end: Offset): Unit = {} - - override def stop(): Unit = { - kafkaOffsetReader.close() - } - - override def toString(): String = s"KafkaV2[$kafkaOffsetReader]" - - /** - * Read initial partition offsets from the checkpoint, or decide the offsets and write them to - * the checkpoint. - */ - private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = { - // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread. - // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever - // (KAFKA-1894). - assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) - - // SparkSession is required for getting Hadoop configuration for writing to checkpoints - assert(SparkSession.getActiveSession.nonEmpty) - - val metadataLog = - new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath) - metadataLog.get(0).getOrElse { - val offsets = startingOffsets match { - case EarliestOffsetRangeLimit => - KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets()) - case LatestOffsetRangeLimit => - KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets()) - case SpecificOffsetRangeLimit(p) => - kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss) - } - metadataLog.add(0, offsets) - logInfo(s"Initial offsets: $offsets") - offsets - }.partitionToOffsets - } - - /** Proportionally distribute limit number of offsets among topicpartitions */ - private def rateLimit( - limit: Long, - from: PartitionOffsetMap, - until: PartitionOffsetMap): PartitionOffsetMap = { - val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq) - val sizes = until.flatMap { - case (tp, end) => - // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it - from.get(tp).orElse(fromNew.get(tp)).flatMap { begin => - val size = end - begin - logDebug(s"rateLimit $tp size is $size") - if (size > 0) Some(tp -> size) else None - } - } - val total = sizes.values.sum.toDouble - if (total < 1) { - until - } else { - until.map { - case (tp, end) => - tp -> sizes.get(tp).map { size => - val begin = from.get(tp).getOrElse(fromNew(tp)) - val prorate = limit * (size / total) - // Don't completely starve small topicpartitions - val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong - // Paranoia, make sure not to return an offset that's past end - Math.min(end, off) - }.getOrElse(end) - } - } - } - - private def getSortedExecutorList(): Array[String] = { - - def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = { - if (a.host == b.host) { - a.executorId > b.executorId - } else { - a.host > b.host - } - } - - val bm = SparkEnv.get.blockManager - bm.master.getPeers(bm.blockManagerId).toArray - .map(x => ExecutorCacheTaskLocation(x.host, x.executorId)) - .sortWith(compare) - .map(_.toString) - } - - /** - * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. - * Otherwise, just log a warning. - */ - private def reportDataLoss(message: String): Unit = { - if (failOnDataLoss) { - throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") - } else { - logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") - } - } - - /** A version of [[HDFSMetadataLog]] specialized for saving the initial offsets. */ - class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String) - extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) { - - val VERSION = 1 - - override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { - out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) - val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) - writer.write("v" + VERSION + "\n") - writer.write(metadata.json) - writer.flush - } - - override def deserialize(in: InputStream): KafkaSourceOffset = { - in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517) - val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8)) - // HDFSMetadataLog guarantees that it never creates a partial file. - assert(content.length != 0) - if (content(0) == 'v') { - val indexOfNewLine = content.indexOf("\n") - if (indexOfNewLine > 0) { - val version = parseVersion(content.substring(0, indexOfNewLine), VERSION) - KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1))) - } else { - throw new IllegalStateException( - s"Log file was malformed: failed to detect the log file version line.") - } - } else { - // The log was generated by Spark 2.1.0 - KafkaSourceOffset(SerializedOffset(content)) - } - } - } -} - -/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */ -private[kafka010] case class KafkaMicroBatchInputPartition( - offsetRange: KafkaOffsetRange, - executorKafkaParams: ju.Map[String, Object], - pollTimeoutMs: Long, - failOnDataLoss: Boolean, - reuseKafkaConsumer: Boolean) extends InputPartition - -private[kafka010] object KafkaMicroBatchReaderFactory extends PartitionReaderFactory { - override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { - val p = partition.asInstanceOf[KafkaMicroBatchInputPartition] - KafkaMicroBatchPartitionReader(p.offsetRange, p.executorKafkaParams, p.pollTimeoutMs, - p.failOnDataLoss, p.reuseKafkaConsumer) - } -} - -/** A [[PartitionReader]] for reading Kafka data in a micro-batch streaming query. */ -private[kafka010] case class KafkaMicroBatchPartitionReader( - offsetRange: KafkaOffsetRange, - executorKafkaParams: ju.Map[String, Object], - pollTimeoutMs: Long, - failOnDataLoss: Boolean, - reuseKafkaConsumer: Boolean) extends PartitionReader[InternalRow] with Logging { - - private val consumer = KafkaDataConsumer.acquire( - offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer) - - private val rangeToRead = resolveRange(offsetRange) - private val converter = new KafkaRecordToUnsafeRowConverter - - private var nextOffset = rangeToRead.fromOffset - private var nextRow: UnsafeRow = _ - - override def next(): Boolean = { - if (nextOffset < rangeToRead.untilOffset) { - val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss) - if (record != null) { - nextRow = converter.toUnsafeRow(record) - nextOffset = record.offset + 1 - true - } else { - false - } - } else { - false - } - } - - override def get(): UnsafeRow = { - assert(nextRow != null) - nextRow - } - - override def close(): Unit = { - consumer.release() - } - - private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = { - if (range.fromOffset < 0 || range.untilOffset < 0) { - // Late bind the offset range - val availableOffsetRange = consumer.getAvailableOffsetRange() - val fromOffset = if (range.fromOffset < 0) { - assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST, - s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}") - availableOffsetRange.earliest - } else { - range.fromOffset - } - val untilOffset = if (range.untilOffset < 0) { - assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST, - s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}") - availableOffsetRange.latest - } else { - range.untilOffset - } - KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None) - } else { - range - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala new file mode 100644 index 0000000..8cc989f --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.io._ +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.UninterruptibleThread + +/** + * A [[MicroBatchReader]] that reads data from Kafka. + * + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user + * must make sure all messages in a topic have been processed when deleting a topic. + * + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped. + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers + * and not use wrong broker addresses. + */ +private[kafka010] class KafkaMicroBatchReader( + kafkaOffsetReader: KafkaOffsetReader, + executorKafkaParams: ju.Map[String, Object], + options: DataSourceOptions, + metadataPath: String, + startingOffsets: KafkaOffsetRangeLimit, + failOnDataLoss: Boolean) + extends MicroBatchReader with Logging { + + private var startPartitionOffsets: PartitionOffsetMap = _ + private var endPartitionOffsets: PartitionOffsetMap = _ + + private val pollTimeoutMs = options.getLong( + "kafkaConsumer.pollTimeoutMs", + SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L) + + private val maxOffsetsPerTrigger = + Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) + + private val rangeCalculator = KafkaOffsetRangeCalculator(options) + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets() + + override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = { + // Make sure initialPartitionOffsets is initialized + initialPartitionOffsets + + startPartitionOffsets = Option(start.orElse(null)) + .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) + .getOrElse(initialPartitionOffsets) + + endPartitionOffsets = Option(end.orElse(null)) + .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) + .getOrElse { + val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets() + maxOffsetsPerTrigger.map { maxOffsets => + rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets) + }.getOrElse { + latestPartitionOffsets + } + } + } + + override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = { + // Find the new partitions, and get their earliest offsets + val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet) + val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq) + if (newPartitionInitialOffsets.keySet != newPartitions) { + // We cannot get from offsets for some partitions. It means they got deleted. + val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) + reportDataLoss( + s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed") + } + logInfo(s"Partitions added: $newPartitionInitialOffsets") + newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => + reportDataLoss( + s"Added partition $p starts from $o instead of 0. Some data may have been missed") + } + + // Find deleted partitions, and report data loss if required + val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet) + if (deletedPartitions.nonEmpty) { + reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed") + } + + // Use the end partitions to calculate offset ranges to ignore partitions that have + // been deleted + val topicPartitions = endPartitionOffsets.keySet.filter { tp => + // Ignore partitions that we don't know the from offsets. + newPartitionInitialOffsets.contains(tp) || startPartitionOffsets.contains(tp) + }.toSeq + logDebug("TopicPartitions: " + topicPartitions.mkString(", ")) + + // Calculate offset ranges + val offsetRanges = rangeCalculator.getRanges( + fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets, + untilOffsets = endPartitionOffsets, + executorLocations = getSortedExecutorList()) + + // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions, + // that is, concurrent tasks will not read the same TopicPartitions. + val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size + + // Generate factories based on the offset ranges + offsetRanges.map { range => + new KafkaMicroBatchInputPartition( + range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer + ): InputPartition[InternalRow] + }.asJava + } + + override def getStartOffset: Offset = { + KafkaSourceOffset(startPartitionOffsets) + } + + override def getEndOffset: Offset = { + KafkaSourceOffset(endPartitionOffsets) + } + + override def deserializeOffset(json: String): Offset = { + KafkaSourceOffset(JsonUtils.partitionOffsets(json)) + } + + override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema + + override def commit(end: Offset): Unit = {} + + override def stop(): Unit = { + kafkaOffsetReader.close() + } + + override def toString(): String = s"KafkaV2[$kafkaOffsetReader]" + + /** + * Read initial partition offsets from the checkpoint, or decide the offsets and write them to + * the checkpoint. + */ + private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = { + // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread. + // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever + // (KAFKA-1894). + assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) + + // SparkSession is required for getting Hadoop configuration for writing to checkpoints + assert(SparkSession.getActiveSession.nonEmpty) + + val metadataLog = + new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath) + metadataLog.get(0).getOrElse { + val offsets = startingOffsets match { + case EarliestOffsetRangeLimit => + KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets()) + case LatestOffsetRangeLimit => + KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets()) + case SpecificOffsetRangeLimit(p) => + kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss) + } + metadataLog.add(0, offsets) + logInfo(s"Initial offsets: $offsets") + offsets + }.partitionToOffsets + } + + /** Proportionally distribute limit number of offsets among topicpartitions */ + private def rateLimit( + limit: Long, + from: PartitionOffsetMap, + until: PartitionOffsetMap): PartitionOffsetMap = { + val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq) + val sizes = until.flatMap { + case (tp, end) => + // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it + from.get(tp).orElse(fromNew.get(tp)).flatMap { begin => + val size = end - begin + logDebug(s"rateLimit $tp size is $size") + if (size > 0) Some(tp -> size) else None + } + } + val total = sizes.values.sum.toDouble + if (total < 1) { + until + } else { + until.map { + case (tp, end) => + tp -> sizes.get(tp).map { size => + val begin = from.get(tp).getOrElse(fromNew(tp)) + val prorate = limit * (size / total) + // Don't completely starve small topicpartitions + val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong + // Paranoia, make sure not to return an offset that's past end + Math.min(end, off) + }.getOrElse(end) + } + } + } + + private def getSortedExecutorList(): Array[String] = { + + def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = { + if (a.host == b.host) { + a.executorId > b.executorId + } else { + a.host > b.host + } + } + + val bm = SparkEnv.get.blockManager + bm.master.getPeers(bm.blockManagerId).toArray + .map(x => ExecutorCacheTaskLocation(x.host, x.executorId)) + .sortWith(compare) + .map(_.toString) + } + + /** + * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. + * Otherwise, just log a warning. + */ + private def reportDataLoss(message: String): Unit = { + if (failOnDataLoss) { + throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") + } else { + logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") + } + } + + /** A version of [[HDFSMetadataLog]] specialized for saving the initial offsets. */ + class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String) + extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) { + + val VERSION = 1 + + override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { + out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) + val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) + writer.write("v" + VERSION + "\n") + writer.write(metadata.json) + writer.flush + } + + override def deserialize(in: InputStream): KafkaSourceOffset = { + in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517) + val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8)) + // HDFSMetadataLog guarantees that it never creates a partial file. + assert(content.length != 0) + if (content(0) == 'v') { + val indexOfNewLine = content.indexOf("\n") + if (indexOfNewLine > 0) { + val version = parseVersion(content.substring(0, indexOfNewLine), VERSION) + KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1))) + } else { + throw new IllegalStateException( + s"Log file was malformed: failed to detect the log file version line.") + } + } else { + // The log was generated by Spark 2.1.0 + KafkaSourceOffset(SerializedOffset(content)) + } + } + } +} + +/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */ +private[kafka010] case class KafkaMicroBatchInputPartition( + offsetRange: KafkaOffsetRange, + executorKafkaParams: ju.Map[String, Object], + pollTimeoutMs: Long, + failOnDataLoss: Boolean, + reuseKafkaConsumer: Boolean) extends InputPartition[InternalRow] { + + override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray + + override def createPartitionReader(): InputPartitionReader[InternalRow] = + new KafkaMicroBatchInputPartitionReader(offsetRange, executorKafkaParams, pollTimeoutMs, + failOnDataLoss, reuseKafkaConsumer) +} + +/** A [[InputPartitionReader]] for reading Kafka data in a micro-batch streaming query. */ +private[kafka010] case class KafkaMicroBatchInputPartitionReader( + offsetRange: KafkaOffsetRange, + executorKafkaParams: ju.Map[String, Object], + pollTimeoutMs: Long, + failOnDataLoss: Boolean, + reuseKafkaConsumer: Boolean) extends InputPartitionReader[InternalRow] with Logging { + + private val consumer = KafkaDataConsumer.acquire( + offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer) + + private val rangeToRead = resolveRange(offsetRange) + private val converter = new KafkaRecordToUnsafeRowConverter + + private var nextOffset = rangeToRead.fromOffset + private var nextRow: UnsafeRow = _ + + override def next(): Boolean = { + if (nextOffset < rangeToRead.untilOffset) { + val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss) + if (record != null) { + nextRow = converter.toUnsafeRow(record) + nextOffset = record.offset + 1 + true + } else { + false + } + } else { + false + } + } + + override def get(): UnsafeRow = { + assert(nextRow != null) + nextRow + } + + override def close(): Unit = { + consumer.release() + } + + private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = { + if (range.fromOffset < 0 || range.untilOffset < 0) { + // Late bind the offset range + val availableOffsetRange = consumer.getAvailableOffsetRange() + val fromOffset = if (range.fromOffset < 0) { + assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST, + s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}") + availableOffsetRange.earliest + } else { + range.fromOffset + } + val untilOffset = if (range.untilOffset < 0) { + assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST, + s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}") + availableOffsetRange.latest + } else { + range.untilOffset + } + KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None) + } else { + range + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 28c9853..d225c1e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -30,8 +30,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ -import org.apache.spark.sql.sources.v2._ -import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -45,9 +46,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with StreamSinkProvider with RelationProvider with CreatableRelationProvider - with StreamingWriteSupportProvider - with ContinuousReadSupportProvider - with MicroBatchReadSupportProvider + with StreamWriteSupport + with ContinuousReadSupport + with MicroBatchReadSupport with Logging { import KafkaSourceProvider._ @@ -107,12 +108,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } /** - * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read - * batches of Kafka data in a micro-batch streaming query. + * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read batches + * of Kafka data in a micro-batch streaming query. */ - override def createMicroBatchReadSupport( + override def createMicroBatchReader( + schema: Optional[StructType], metadataPath: String, - options: DataSourceOptions): KafkaMicroBatchReadSupport = { + options: DataSourceOptions): KafkaMicroBatchReader = { val parameters = options.asMap().asScala.toMap validateStreamOptions(parameters) @@ -138,7 +140,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister parameters, driverGroupIdPrefix = s"$uniqueGroupId-driver") - new KafkaMicroBatchReadSupport( + new KafkaMicroBatchReader( kafkaOffsetReader, kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), options, @@ -148,12 +150,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } /** - * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read + * Creates a [[ContinuousInputPartitionReader]] to read * Kafka data in a continuous streaming query. */ - override def createContinuousReadSupport( + override def createContinuousReader( + schema: Optional[StructType], metadataPath: String, - options: DataSourceOptions): KafkaContinuousReadSupport = { + options: DataSourceOptions): KafkaContinuousReader = { val parameters = options.asMap().asScala.toMap validateStreamOptions(parameters) // Each running query should use its own group id. Otherwise, the query may be only assigned @@ -178,7 +181,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister parameters, driverGroupIdPrefix = s"$uniqueGroupId-driver") - new KafkaContinuousReadSupport( + new KafkaContinuousReader( kafkaOffsetReader, kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), parameters, @@ -267,11 +270,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } } - override def createStreamingWriteSupport( + override def createStreamWriter( queryId: String, schema: StructType, mode: OutputMode, - options: DataSourceOptions): StreamingWriteSupport = { + options: DataSourceOptions): StreamWriter = { import scala.collection.JavaConverters._ val spark = SparkSession.getActiveSession.get @@ -282,7 +285,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister KafkaWriter.validateQuery( schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic) - new KafkaStreamingWriteSupport(topic, producerParams, schema) + new KafkaStreamWriter(topic, producerParams, schema) } private def strategy(caseInsensitiveParams: Map[String, String]) = http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala new file mode 100644 index 0000000..97c577d --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.types.StructType + +/** + * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we + * don't need to really send one. + */ +case object KafkaWriterCommitMessage extends WriterCommitMessage + +/** + * A [[StreamWriter]] for Kafka writing. Responsible for generating the writer factory. + * + * @param topic The topic this writer is responsible for. If None, topic will be inferred from + * a `topic` field in the incoming data. + * @param producerParams Parameters for Kafka producers in each task. + * @param schema The schema of the input data. + */ +class KafkaStreamWriter( + topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends StreamWriter { + + validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic) + + override def createWriterFactory(): KafkaStreamWriterFactory = + KafkaStreamWriterFactory(topic, producerParams, schema) + + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} +} + +/** + * A [[StreamingDataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to + * generate the per-task data writers. + * @param topic The topic that should be written to. If None, topic will be inferred from + * a `topic` field in the incoming data. + * @param producerParams Parameters for Kafka producers in each task. + * @param schema The schema of the input data. + */ +case class KafkaStreamWriterFactory( + topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends DataWriterFactory[InternalRow] { + + override def createDataWriter( + partitionId: Int, + taskId: Long, + epochId: Long): DataWriter[InternalRow] = { + new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes) + } +} + +/** + * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to + * process incoming rows. + * + * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred + * from a `topic` field in the incoming data. + * @param producerParams Parameters to use for the Kafka producer. + * @param inputSchema The attributes in the input data. + */ +class KafkaStreamDataWriter( + targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute]) + extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { + import scala.collection.JavaConverters._ + + private lazy val producer = CachedKafkaProducer.getOrCreate( + new java.util.HashMap[String, Object](producerParams.asJava)) + + def write(row: InternalRow): Unit = { + checkForErrors() + sendRow(row, producer) + } + + def commit(): WriterCommitMessage = { + // Send is asynchronous, but we can't commit until all rows are actually in Kafka. + // This requires flushing and then checking that no callbacks produced errors. + // We also check for errors before to fail as soon as possible - the check is cheap. + checkForErrors() + producer.flush() + checkForErrors() + KafkaWriterCommitMessage + } + + def abort(): Unit = {} + + def close(): Unit = { + checkForErrors() + if (producer != null) { + producer.flush() + checkForErrors() + CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava)) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala deleted file mode 100644 index 927c56d..0000000 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery -import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} -import org.apache.spark.sql.types.StructType - -/** - * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we - * don't need to really send one. - */ -case object KafkaWriterCommitMessage extends WriterCommitMessage - -/** - * A [[StreamingWriteSupport]] for Kafka writing. Responsible for generating the writer factory. - * - * @param topic The topic this writer is responsible for. If None, topic will be inferred from - * a `topic` field in the incoming data. - * @param producerParams Parameters for Kafka producers in each task. - * @param schema The schema of the input data. - */ -class KafkaStreamingWriteSupport( - topic: Option[String], producerParams: Map[String, String], schema: StructType) - extends StreamingWriteSupport { - - validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic) - - override def createStreamingWriterFactory(): KafkaStreamWriterFactory = - KafkaStreamWriterFactory(topic, producerParams, schema) - - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} - override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} -} - -/** - * A [[StreamingDataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to - * generate the per-task data writers. - * @param topic The topic that should be written to. If None, topic will be inferred from - * a `topic` field in the incoming data. - * @param producerParams Parameters for Kafka producers in each task. - * @param schema The schema of the input data. - */ -case class KafkaStreamWriterFactory( - topic: Option[String], producerParams: Map[String, String], schema: StructType) - extends StreamingDataWriterFactory { - - override def createWriter( - partitionId: Int, - taskId: Long, - epochId: Long): DataWriter[InternalRow] = { - new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes) - } -} - -/** - * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to - * process incoming rows. - * - * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred - * from a `topic` field in the incoming data. - * @param producerParams Parameters to use for the Kafka producer. - * @param inputSchema The attributes in the input data. - */ -class KafkaStreamDataWriter( - targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute]) - extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { - import scala.collection.JavaConverters._ - - private lazy val producer = CachedKafkaProducer.getOrCreate( - new java.util.HashMap[String, Object](producerParams.asJava)) - - def write(row: InternalRow): Unit = { - checkForErrors() - sendRow(row, producer) - } - - def commit(): WriterCommitMessage = { - // Send is asynchronous, but we can't commit until all rows are actually in Kafka. - // This requires flushing and then checking that no callbacks produced errors. - // We also check for errors before to fail as soon as possible - the check is cheap. - checkForErrors() - producer.flush() - checkForErrors() - KafkaWriterCommitMessage - } - - def abort(): Unit = {} - - def close(): Unit = { - checkForErrors() - if (producer != null) { - producer.flush() - checkForErrors() - CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava)) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index af51021..a0e5818 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.clients.producer.ProducerRecord import org.apache.spark.sql.Dataset -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.streaming.Trigger @@ -207,13 +207,11 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { testUtils.createTopic(topic2, partitions = 5) eventually(timeout(streamingTimeout)) { assert( - query.lastExecution.executedPlan.collectFirst { - case scan: DataSourceV2ScanExec - if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => - scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] - }.exists { config => + query.lastExecution.logical.collectFirst { + case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r + }.exists { r => // Ensure the new topic is present and the old topic is gone. - config.knownPartitions.exists(_.topic == topic2) + r.knownPartitions.exists(_.topic == topic2) }, s"query never reconfigured to new topic $topic2") } http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala index fa6bdc2..fa1468a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.streaming.Trigger @@ -46,10 +46,8 @@ trait KafkaContinuousTest extends KafkaSourceTest { testUtils.addPartitions(topic, newCount) eventually(timeout(streamingTimeout)) { assert( - query.lastExecution.executedPlan.collectFirst { - case scan: DataSourceV2ScanExec - if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => - scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] + query.lastExecution.logical.collectFirst { + case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r }.exists(_.knownPartitions.size == newCount), s"query never reconfigured to $newCount partitions") } http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 8e246db..65615fd 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010 import java.io._ import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.{Files, Paths} -import java.util.Locale +import java.util.{Locale, Optional} import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger @@ -28,7 +28,7 @@ import scala.collection.JavaConverters._ import scala.io.Source import scala.util.Random -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ @@ -40,9 +40,11 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with KafkaTest { @@ -112,16 +114,14 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf query.nonEmpty, "Cannot add data when there is no query for finding the active kafka source") - val sources: Seq[BaseStreamingSource] = { + val sources = { query.get.logicalPlan.collect { case StreamingExecutionRelation(source: KafkaSource, _) => source - case StreamingExecutionRelation(source: KafkaMicroBatchReadSupport, _) => source + case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => source } ++ (query.get.lastExecution match { case null => Seq() case e => e.logical.collect { - case r: StreamingDataSourceV2Relation - if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] => - r.readSupport.asInstanceOf[KafkaContinuousReadSupport] + case StreamingDataSourceV2Relation(_, _, _, reader: KafkaContinuousReader) => reader } }) }.distinct @@ -905,7 +905,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { makeSureGetOffsetCalled, AssertOnQuery { query => query.logicalPlan.collect { - case StreamingExecutionRelation(_: KafkaMicroBatchReadSupport, _) => true + case StreamingExecutionRelation(_: KafkaMicroBatchReader, _) => true }.nonEmpty } ) @@ -930,16 +930,17 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { "kafka.bootstrap.servers" -> testUtils.brokerAddress, "subscribe" -> topic ) ++ Option(minPartitions).map { p => "minPartitions" -> p} - val readSupport = provider.createMicroBatchReadSupport( - dir.getAbsolutePath, new DataSourceOptions(options.asJava)) - val config = readSupport.newScanConfigBuilder( - KafkaSourceOffset(Map(tp -> 0L)), - KafkaSourceOffset(Map(tp -> 100L))).build() - val inputPartitions = readSupport.planInputPartitions(config) + val reader = provider.createMicroBatchReader( + Optional.empty[StructType], dir.getAbsolutePath, new DataSourceOptions(options.asJava)) + reader.setOffsetRange( + Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))), + Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L))) + ) + val factories = reader.planInputPartitions().asScala .map(_.asInstanceOf[KafkaMicroBatchInputPartition]) - withClue(s"minPartitions = $minPartitions generated factories $inputPartitions\n\t") { - assert(inputPartitions.size == numPartitionsGenerated) - inputPartitions.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) } + withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") { + assert(factories.size == numPartitionsGenerated) + factories.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org