This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new ba073b3 [CARBONDATA-3585] Handle Range Compaction failure in case of KryoSerializer ba073b3 is described below commit ba073b38f5ecdc900e079c4abd1f53bf16f937b0 Author: manishnalla1994 <manish.nalla1...@gmail.com> AuthorDate: Fri Nov 15 16:39:22 2019 +0530 [CARBONDATA-3585] Handle Range Compaction failure in case of KryoSerializer Problem : Range Compaction fails in case of Kryo Serializer. Solution : Fixed it by converting splits into Byte-Array and then broadcasting them to all the executors in case of Range Column. This closes #3462 --- .../carbondata/hadoop/CarbonInputSplitWrapper.java | 68 ++++++++++++++++++++++ .../carbondata/spark/rdd/CarbonMergerRDD.scala | 8 +-- 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplitWrapper.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplitWrapper.java new file mode 100644 index 0000000..d9b8293 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplitWrapper.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.stream.ExtendedByteArrayOutputStream; +import org.apache.carbondata.core.util.CarbonUtil; + +public class CarbonInputSplitWrapper implements Serializable { + private byte[] data; + private int size; + + public CarbonInputSplitWrapper(List<CarbonInputSplit> inputSplitList) { + ExtendedByteArrayOutputStream stream = new ExtendedByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(stream); + try { + for (CarbonInputSplit carbonInputSplit : inputSplitList) { + carbonInputSplit.write(dos); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + CarbonUtil.closeStreams(dos); + } + this.data = stream.getBuffer(); + this.size = inputSplitList.size(); + } + + public List<CarbonInputSplit> getInputSplit() { + ByteArrayInputStream stream = new ByteArrayInputStream(data); + DataInputStream dis = new DataInputStream(stream); + List<CarbonInputSplit> splits = new ArrayList<>(); + try { + for (int i = 0; i < size; i++) { + CarbonInputSplit split = new CarbonInputSplit(); + split.readFields(dis); + splits.add(split); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + CarbonUtil.closeStreams(dis); + } + return splits; + } +} diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 65fc9f3..0f31471 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -56,7 +56,7 @@ import org.apache.carbondata.core.scan.result.iterator.RawResultIterator import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatusManager, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection} +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonInputSplitWrapper, CarbonMultiBlockSplit, CarbonProjection} import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo} import org.apache.carbondata.processing.loading.TableProcessingOperations @@ -89,10 +89,10 @@ class CarbonMergerRDD[K, V]( var rangeColumn: CarbonColumn = null var singleRange = false var expressionMapForRangeCol: util.Map[Integer, Expression] = null - var broadCastSplits: Broadcast[util.List[CarbonInputSplit]] = null + var broadCastSplits: Broadcast[CarbonInputSplitWrapper] = null def makeBroadCast(splits: util.List[CarbonInputSplit]): Unit = { - broadCastSplits = sparkContext.broadcast(splits) + broadCastSplits = sparkContext.broadcast(new CarbonInputSplitWrapper(splits)) } override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { @@ -121,7 +121,7 @@ class CarbonMergerRDD[K, V]( // all the splits) carbonSparkPartition.split.value.getAllSplits } else { - broadCastSplits.value + broadCastSplits.value.getInputSplit } val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)