[CARBONDATA-1173] Stream ingestion - write path framework This closes #1064
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/772efdd5 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/772efdd5 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/772efdd5 Branch: refs/heads/streaming_ingest Commit: 772efdd505cbd0d13a0260f4ede80c817f63ae36 Parents: d4eabbe Author: Aniket Adnaik <[email protected]> Authored: Thu Jun 15 11:57:43 2017 -0700 Committer: Jacky Li <[email protected]> Committed: Thu Sep 21 08:26:44 2017 +0800 ---------------------------------------------------------------------- .../streaming/CarbonStreamingCommitInfo.java | 108 ++++++++++ .../streaming/CarbonStreamingConstants.java | 25 +++ .../streaming/CarbonStreamingMetaStore.java | 40 ++++ .../streaming/CarbonStreamingMetaStoreImpl.java | 56 ++++++ .../core/util/path/CarbonTablePath.java | 10 + .../streaming/CarbonStreamingOutputFormat.java | 66 +++++++ .../streaming/CarbonStreamingRecordWriter.java | 196 +++++++++++++++++++ .../org/apache/spark/sql/CarbonSource.scala | 41 +++- .../CarbonStreamingOutpurWriteFactory.scala | 88 +++++++++ .../streaming/CarbonStreamingOutputWriter.scala | 98 ++++++++++ 10 files changed, 720 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/772efdd5/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java new file mode 100644 index 0000000..6cf303a --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.streaming; + +/** + * Commit info for streaming writes + * The commit info can be used to recover valid offset in the file + * in the case of write failure. + */ +public class CarbonStreamingCommitInfo { + + private String dataBase; + + private String table; + + private long commitTime; + + private long segmentID; + + private String partitionID; + + private long batchID; + + private String fileOffset; + + private long transactionID; // future use + + public CarbonStreamingCommitInfo( + + String dataBase, + + String table, + + long commitTime, + + long segmentID, + + String partitionID, + + long batchID) { + + this.dataBase = dataBase; + + this.table = table; + + this.commitTime = commitTime; + + this.segmentID = segmentID; + + this.partitionID = partitionID; + + this.batchID = batchID; + + this.transactionID = -1; + } + + public String getDataBase() { + return dataBase; + } + + public String getTable() { + return table; + } + + public long getCommitTime() { + return commitTime; + } + + public long getSegmentID() { + return segmentID; + } + + public String getPartitionID() { + return partitionID; + } + + public long getBatchID() { + return batchID; + } + + public String getFileOffset() { + return fileOffset; + } + + public long getTransactionID() { + return transactionID; + } + + @Override + public String toString() { + return dataBase + "." + table + "." + segmentID + "$" + partitionID; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/772efdd5/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java new file mode 100644 index 0000000..db7186f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.streaming; + +public class CarbonStreamingConstants { + + public static final long DEFAULT_CARBON_STREAM_FILE_BLOCK_SIZE = 1024 * 1024 * 1024; // 1GB + +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/772efdd5/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java new file mode 100644 index 0000000..fa3746c --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.streaming; + + +import java.io.IOException; + +/** + * Generic interface for storing commit info for streaming ingest + */ +public interface CarbonStreamingMetaStore { + + public CarbonStreamingCommitInfo getStreamingCommitInfo( + String dataBase, + String table, + long segmentID, + String partitionID) throws IOException; + + public void updateStreamingCommitInfo( + CarbonStreamingMetaStore commitInfo) throws IOException; + + public void recoverStreamingData( + CarbonStreamingCommitInfo commitInfo) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/772efdd5/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java new file mode 100644 index 0000000..0afe962 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.streaming; + +import java.io.IOException; + +/** + * JSON format can be used to store the metadata + */ +public class CarbonStreamingMetaStoreImpl implements CarbonStreamingMetaStore { + + /** + * get commit info from metastore + */ + public CarbonStreamingCommitInfo getStreamingCommitInfo( + String dataBase, + String table, + long segmentID, + String partitionID) throws IOException { + + return null; + + } + + /** + * Update commit info in metastore + */ + public void updateStreamingCommitInfo( + CarbonStreamingMetaStore commitInfo) throws IOException { + + } + + /** + * Recover streaming data using valid offset in commit info + */ + public void recoverStreamingData( + CarbonStreamingCommitInfo commitInfo) throws IOException { + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/772efdd5/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index 0910afc..8f4fa26 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -51,6 +51,16 @@ public class CarbonTablePath extends Path { protected static final String INDEX_FILE_EXT = ".carbonindex"; protected static final String DELETE_DELTA_FILE_EXT = ".deletedelta"; + /** + * Streaming ingest related paths + */ + protected static final String STREAM_PREFIX = "Streaming"; + protected static final String STREAM_FILE_NAME_EXT = ".carbondata.stream"; + protected static final String STREAM_FILE_BEING_WRITTEN = "in-progress.carbondata.stream"; + protected static final String STREAM_FILE_BEING_WRITTEN_META = "in-progress.meta"; + protected static final String STREAM_COMPACTION_STATUS = "streaming_compaction_status"; + protected static final String STREAM_FILE_LOCK = "streaming_in_use.lock"; + protected String tablePath; protected CarbonTableIdentifier carbonTableIdentifier; http://git-wip-us.apache.org/repos/asf/carbondata/blob/772efdd5/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java new file mode 100644 index 0000000..fc6f455 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.IOException; + +import org.apache.carbondata.core.streaming.CarbonStreamingConstants; +import org.apache.carbondata.processing.csvload.CSVInputFormat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + + +/** + * Output format to write streaming data to carbondata file + * + * @param <V> - type of record + */ +public class CarbonStreamingOutputFormat<K, V> extends FileOutputFormat<K, V> { + + public static long getBlockSize(Configuration conf) { + return conf.getLong("dfs.block.size", + CarbonStreamingConstants.DEFAULT_CARBON_STREAM_FILE_BLOCK_SIZE); + } + + public static void setBlockSize(Configuration conf, long blockSize) { + conf.setLong("dfs.block.size", blockSize); + } + + /** + * When getRecordWriter may need to override + * to provide correct path including streaming segment name + */ + @Override + public CarbonStreamingRecordWriter<K, V> getRecordWriter(TaskAttemptContext job) + throws IOException, InterruptedException { + + Configuration conf = job.getConfiguration(); + + String keyValueSeparator = conf.get( + CSVInputFormat.DELIMITER, + CSVInputFormat.DELIMITER_DEFAULT); + + return new CarbonStreamingRecordWriter<K, V>( + conf, + getDefaultWorkFile(job, null), + keyValueSeparator); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/772efdd5/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java new file mode 100644 index 0000000..9d1951f --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.streaming; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + + +public class CarbonStreamingRecordWriter<K,V> extends RecordWriter<K, V> { + + private static final String utf8 = "UTF-8"; + + private static final byte[] newline; + + static { + + try { + + newline = "\n".getBytes(utf8); + + } catch (UnsupportedEncodingException uee) { + + throw new IllegalArgumentException("Can't find " + utf8 + " encoding"); + } + } + + private FSDataOutputStream outputStream; + + private FileSystem fs; + + private Path file; + + private volatile boolean isClosed; + + private final byte[] keyValueSeparator; + + public void initOut() throws IOException { + + outputStream = fs.create(file, false); + + isClosed = false; + } + + public CarbonStreamingRecordWriter( + Configuration conf, + Path file, + String keyValueSeparator) throws IOException { + + this.file = file; + + fs = FileSystem.get(conf); + + outputStream = fs.create(file, false); + + isClosed = false; + + try { + + this.keyValueSeparator = keyValueSeparator.getBytes(utf8); + + } catch (UnsupportedEncodingException uee) { + + throw new IllegalArgumentException("Can't find " + utf8 + "encoding"); + + } + + } + + public CarbonStreamingRecordWriter( + Configuration conf, + Path file) throws IOException { + + this(conf, file, ","); + + } + + /** + * Write Object to byte stream. + */ + + private void writeObject(Object o) throws IOException { + + if (o instanceof Text) { + Text to = (Text)o; + + outputStream.write(to.getBytes(), 0, to.getLength()); + + } else { + + outputStream.write(o.toString().getBytes(utf8)); + + } + } + + /** + * Write streaming data as text file (temporary) + */ + + @Override + public synchronized void write(K key, V value) throws IOException { + + boolean isNULLKey = key == null || key instanceof NullWritable; + + boolean isNULLValue = value == null || value instanceof NullWritable; + + if (isNULLKey && isNULLValue) { + + return; + } + + if (!isNULLKey) { + + writeObject(key); + } + + if (!isNULLKey || !isNULLValue) { + + outputStream.write(keyValueSeparator); + } + + if (!isNULLValue) { + + writeObject(value); + } + + outputStream.write(newline); + } + + private void closeInternal() throws IOException { + + if (!isClosed) { + + outputStream.close(); + + isClosed = true; + } + + } + + public void flush() throws IOException { + + outputStream.hflush(); + } + + public long getOffset() throws IOException { + + return outputStream.getPos(); + } + + public void commit(boolean finalCommit) throws IOException { + + closeInternal(); + + Path commitFile = new Path(file.getParent(), + CarbonTablePath.getCarbonDataPrefix() + System.currentTimeMillis()); + + fs.rename(file, commitFile); + + if (!finalCommit) { + initOut(); + } + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + + closeInternal(); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/772efdd5/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 1b021b0..2f97dc8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -21,32 +21,36 @@ import scala.collection.JavaConverters._ import scala.language.implicitConversions import org.apache.commons.lang.StringUtils -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.CarbonLateDecodeStrategy import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory} import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation} import org.apache.spark.sql.optimizer.CarbonLateDecodeRule import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry import org.apache.carbondata.core.metadata.schema.table.TableInfo -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException + /** * Carbon relation provider compliant to data source api. * Creates carbon relations */ class CarbonSource extends CreatableRelationProvider with RelationProvider - with SchemaRelationProvider with DataSourceRegister { + with SchemaRelationProvider with DataSourceRegister with FileFormat { override def shortName(): String = "carbondata" @@ -54,7 +58,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { CarbonEnv.getInstance(sqlContext.sparkSession) - // if path is provided we can directly create Hadoop relation. \ + // if path is provided we can directly create Hadoop relation. // Otherwise create datasource relation parameters.get("tablePath") match { case Some(path) => CarbonDatasourceHadoopRelation(sqlContext.sparkSession, @@ -178,7 +182,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider /** * Returns the path of the table * - * @param sparkSession + * @param sparkSession * @param dbName * @param tableName * @return @@ -203,11 +207,32 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider (relation.tableMeta.tablePath, parameters) } } catch { - case ex: Exception => - throw new Exception(s"Do not have $dbName and $tableName", ex) + case ex: Exception => + throw new Exception(s"Do not have $dbName and $tableName", ex) } } + /** + * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can + * be put here. For example, user defined output committer can be configured here + * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. + */ + def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory() + + /** + * When possible, this method should return the schema of the given `files`. When the format + * does not support inference, or no valid files are given should return None. In these cases + * Spark will require that user specify the schema manually. + */ + def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType)) + } object CarbonSource { http://git-wip-us.apache.org/repos/asf/carbondata/blob/772efdd5/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala new file mode 100644 index 0000000..be69885 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala @@ -0,0 +1,88 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.streaming + + +import java.util.concurrent.ConcurrentHashMap + +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.spark.sql.execution.datasources.OutputWriterFactory +import org.apache.spark.sql.types.StructType + +import org.apache.carbondata.core.util.path.CarbonTablePath + + +class CarbonStreamingOutputWriterFactory extends OutputWriterFactory { + + /** + * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]], + * this method gets called by each task on executor side + * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s. + * + * @param path Path to write the file. + * @param dataSchema Schema of the rows to be written. Partition columns are not + * included in the schema if the relation being written is + * partitioned. + * @param context The Hadoop MapReduce task context. + */ + + override def newInstance( + path: String, + + dataSchema: StructType, + + context: TaskAttemptContext) : CarbonStreamingOutputWriter = { + + new CarbonStreamingOutputWriter(path, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + + CarbonTablePath.STREAM_FILE_NAME_EXT + } + +} + +object CarbonStreamingOutpurWriterFactory { + + private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]() + + def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = { + + if (writers.contains(path)) { + throw new IllegalArgumentException(path + "writer already exists") + } + + writers.put(path, writer) + } + + def getWriter(path: String): CarbonStreamingOutputWriter = { + + writers.get(path) + } + + def containsWriter(path: String): Boolean = { + + writers.containsKey(path) + } + + def removeWriter(path: String): Unit = { + + writers.remove(path) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/772efdd5/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala new file mode 100644 index 0000000..dfc8ff3 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{NullWritable, Text} +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.OutputWriter +import org.apache.spark.sql.Row + +import org.apache.carbondata.hadoop.streaming.{CarbonStreamingOutputFormat, CarbonStreamingRecordWriter} + +class CarbonStreamingOutputWriter ( + path: String, + context: TaskAttemptContext) + extends OutputWriter { + + private[this] val buffer = new Text() + + private val recordWriter: CarbonStreamingRecordWriter[NullWritable, Text] = { + + val outputFormat = new CarbonStreamingOutputFormat[NullWritable, Text] () { + + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String) : Path = { + new Path(path) + } + + /* + May need to override + def getOutputCommiter(c: TaskAttemptContext): OutputCommitter = { + null + } + */ + + } + + outputFormat. + getRecordWriter(context).asInstanceOf[CarbonStreamingRecordWriter[NullWritable, Text]] + } + + override def write(row: Row): Unit = { + + throw new UnsupportedOperationException("call writeInternal") + + } + + override protected [sql] def writeInternal(row: InternalRow): Unit = { + + val utf8string = row.getUTF8String(0) + + buffer.set(utf8string.getBytes) + + recordWriter.write(NullWritable.get(), buffer) + + } + + def getpath: String = path + + override def close(): Unit = { + + recordWriter.close(context) + + } + + def flush(): Unit = { + + recordWriter.flush() + + } + + def getPos(): Long = { + + recordWriter.getOffset() + + } + + def commit(finalCommit: Boolean): Unit = { + + recordWriter.commit(finalCommit) + + } +}
