This is an automated email from the ASF dual-hosted git repository.
weibin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-graphar.git
The following commit(s) were added to refs/heads/main by this push:
new 73e0702c feat(c++,spark,pyspark): support json payload file format
(#518)
73e0702c is described below
commit 73e0702c9797878444e927401ddbe0f1f4ae1736
Author: amygbAI <[email protected]>
AuthorDate: Tue Jun 25 07:43:11 2024 +0530
feat(c++,spark,pyspark): support json payload file format (#518)
Reason for this PR
This is a rebased pull request, so to speak of ( due to the issue with
JSONOptions being private in spark 3.2 and 3.3 ) ..also added test artifacts
separately in incubator-graphar-testing. Was able to compile with spark 3.2 and
3.3 ( in both datasources-32 and datasources-33 )
What changes are included in this PR?
Changes in datasources-32 and datasources-33
Changes in cpp
Changes in pyspark
Are these changes tested?
yes, added the generated ldbc samples in
incubator-graphar-testing/ldbc_sample/json
Are there any user-facing changes?
None
---------
Signed-off-by: amygbAI <[email protected]>
Signed-off-by: acezen <[email protected]>
Co-authored-by: Ubuntu <[email protected]>
Co-authored-by: acezen <[email protected]>
---
cpp/include/graphar/fwd.h | 2 +-
cpp/include/graphar/util/file_type.h | 2 +
cpp/src/filesystem.cc | 2 +
cpp/test/test_arrow_chunk_reader.cc | 64 ++++++++++++++++++-
.../apache/graphar/datasources/GarDataSource.scala | 2 +
.../org/apache/spark/sql/graphar/GarScan.scala | 45 +++++++++++++
.../apache/spark/sql/graphar/GarScanBuilder.scala | 6 +-
.../org/apache/spark/sql/graphar/GarTable.scala | 18 ++++++
.../spark/sql/graphar/json/JSONWriteBuilder.scala | 73 ++++++++++++++++++++++
.../apache/graphar/datasources/GarDataSource.scala | 2 +
.../org/apache/spark/sql/graphar/GarScan.scala | 44 +++++++++++++
.../apache/spark/sql/graphar/GarScanBuilder.scala | 2 +
.../org/apache/spark/sql/graphar/GarTable.scala | 18 ++++++
.../spark/sql/graphar/json/JSONWriteBuilder.scala | 73 ++++++++++++++++++++++
.../main/scala/org/apache/graphar/GraphInfo.scala | 3 +
.../test/scala/org/apache/graphar/TestReader.scala | 19 ++++++
pyspark/graphar_pyspark/enums.py | 1 +
pyspark/graphar_pyspark/graph.py | 2 +-
pyspark/tests/test_reader.py | 39 ++++++++++++
testing | 2 +-
20 files changed, 413 insertions(+), 6 deletions(-)
diff --git a/cpp/include/graphar/fwd.h b/cpp/include/graphar/fwd.h
index acc4ce06..d053a165 100644
--- a/cpp/include/graphar/fwd.h
+++ b/cpp/include/graphar/fwd.h
@@ -73,7 +73,7 @@ using IdType = int64_t;
enum class Type;
class DataType;
/** Type of file format */
-enum FileType { CSV = 0, PARQUET = 1, ORC = 2 };
+enum FileType { CSV = 0, PARQUET = 1, ORC = 2, JSON = 3 };
enum class AdjListType : uint8_t;
template <typename T>
diff --git a/cpp/include/graphar/util/file_type.h
b/cpp/include/graphar/util/file_type.h
index f1d08daa..7fc218ed 100644
--- a/cpp/include/graphar/util/file_type.h
+++ b/cpp/include/graphar/util/file_type.h
@@ -31,6 +31,7 @@ namespace graphar {
static inline FileType StringToFileType(const std::string& str) {
static const std::map<std::string, FileType> str2file_type{
{"csv", FileType::CSV},
+ {"json", FileType::JSON},
{"parquet", FileType::PARQUET},
{"orc", FileType::ORC}};
try {
@@ -43,6 +44,7 @@ static inline FileType StringToFileType(const std::string&
str) {
static inline const char* FileTypeToString(FileType file_type) {
static const std::map<FileType, const char*> file_type2string{
{FileType::CSV, "csv"},
+ {FileType::JSON, "json"},
{FileType::PARQUET, "parquet"},
{FileType::ORC, "orc"}};
return file_type2string.at(file_type);
diff --git a/cpp/src/filesystem.cc b/cpp/src/filesystem.cc
index bc78108a..23b9ec0b 100644
--- a/cpp/src/filesystem.cc
+++ b/cpp/src/filesystem.cc
@@ -91,6 +91,8 @@ std::shared_ptr<ds::FileFormat> FileSystem::GetFileFormat(
return std::make_shared<ds::ParquetFileFormat>();
case ORC:
return std::make_shared<ds::OrcFileFormat>();
+ case JSON:
+ return std::make_shared<ds::JsonFileFormat>();
default:
return nullptr;
}
diff --git a/cpp/test/test_arrow_chunk_reader.cc
b/cpp/test/test_arrow_chunk_reader.cc
index 36608d64..cc5b3e99 100644
--- a/cpp/test/test_arrow_chunk_reader.cc
+++ b/cpp/test/test_arrow_chunk_reader.cc
@@ -291,7 +291,6 @@ TEST_CASE_METHOD(GlobalFixture, "ArrowChunkReader") {
auto maybe_reader = AdjListArrowChunkReader::Make(
graph_info, src_label, edge_label, dst_label,
AdjListType::ordered_by_source);
- REQUIRE(maybe_reader.status().ok());
auto reader = maybe_reader.value();
// check reader start from vertex chunk 0
auto result = reader->GetChunk();
@@ -463,4 +462,67 @@ TEST_CASE_METHOD(GlobalFixture, "ArrowChunkReader") {
REQUIRE(reader->seek(1024).IsIndexError());
}
}
+
+TEST_CASE_METHOD(GlobalFixture, "JSON_TEST") {
+ // read file and construct graph info
+ std::string path = test_data_dir + "/ldbc_sample/json/LdbcSample.graph.yml";
+ std::string src_label = "Person", edge_label = "Knows", dst_label = "Person";
+ std::string vertex_property_name = "id";
+ std::string edge_property_name = "creationDate";
+ auto maybe_graph_info = GraphInfo::Load(path);
+ REQUIRE(maybe_graph_info.status().ok());
+ auto graph_info = maybe_graph_info.value();
+ auto vertex_info = graph_info->GetVertexInfo(src_label);
+ REQUIRE(vertex_info != nullptr);
+ auto v_pg = vertex_info->GetPropertyGroup(vertex_property_name);
+ REQUIRE(v_pg != nullptr);
+ auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
+ REQUIRE(edge_info != nullptr);
+ auto e_pg = edge_info->GetPropertyGroup(edge_property_name);
+ REQUIRE(e_pg != nullptr);
+
+ SECTION("VertexPropertyArrowChunkReader") {
+ auto maybe_reader = VertexPropertyArrowChunkReader::Make(
+ graph_info, src_label, vertex_property_name);
+ REQUIRE(maybe_reader.status().ok());
+ auto reader = maybe_reader.value();
+ REQUIRE(reader->GetChunkNum() == 10);
+
+ SECTION("Basics") {
+ auto result = reader->GetChunk();
+ REQUIRE(!result.has_error());
+ auto table = result.value();
+ REQUIRE(table->num_rows() == 100);
+ REQUIRE(table->GetColumnByName(GeneralParams::kVertexIndexCol) !=
+ nullptr);
+
+ // seek
+ REQUIRE(reader->seek(100).ok());
+ result = reader->GetChunk();
+ REQUIRE(!result.has_error());
+ table = result.value();
+ REQUIRE(table->num_rows() == 100);
+ REQUIRE(table->GetColumnByName(GeneralParams::kVertexIndexCol) !=
+ nullptr);
+ REQUIRE(reader->next_chunk().ok());
+ result = reader->GetChunk();
+ REQUIRE(!result.has_error());
+ table = result.value();
+ REQUIRE(table->num_rows() == 100);
+ REQUIRE(table->GetColumnByName(GeneralParams::kVertexIndexCol) !=
+ nullptr);
+ REQUIRE(reader->seek(900).ok());
+ result = reader->GetChunk();
+ REQUIRE(!result.has_error());
+ table = result.value();
+ REQUIRE(table->num_rows() == 3);
+ REQUIRE(table->GetColumnByName(GeneralParams::kVertexIndexCol) !=
+ nullptr);
+ REQUIRE(reader->GetChunkNum() == 10);
+ REQUIRE(reader->next_chunk().IsIndexError());
+
+ REQUIRE(reader->seek(1024).IsIndexError());
+ }
+ }
+}
} // namespace graphar
diff --git
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
index 7424ad68..eac30887 100644
---
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
+++
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.graphar.GarTable
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
@@ -174,6 +175,7 @@ class GarDataSource extends TableProvider with
DataSourceRegister {
case "csv" => classOf[CSVFileFormat]
case "orc" => classOf[OrcFileFormat]
case "parquet" => classOf[ParquetFileFormat]
+ case "json" => classOf[JsonFileFormat]
case _ => throw new IllegalArgumentException
}
}
diff --git
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
index b6027f8a..804f0f51 100644
---
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
+++
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
@@ -24,7 +24,9 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.csv.CSVOptions
+import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, Expression}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.parquet.{
@@ -34,6 +36,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{
}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import
org.apache.spark.sql.execution.datasources.v2.csv.CSVPartitionReaderFactory
+import
org.apache.spark.sql.execution.datasources.v2.json.JsonPartitionReaderFactory
import
org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory
import
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
import org.apache.spark.sql.execution.datasources.{
@@ -74,6 +77,7 @@ case class GarScan(
case "csv" => createCSVReaderFactory()
case "orc" => createOrcReaderFactory()
case "parquet" => createParquetReaderFactory()
+ case "json" => createJSONReaderFactory()
case _ =>
throw new IllegalArgumentException("Invalid format name: " +
formatName)
}
@@ -193,6 +197,46 @@ case class GarScan(
)
}
+ // Create the reader factory for the JSON format.
+ private def createJSONReaderFactory(): PartitionReaderFactory = {
+ val parsedOptions = new JSONOptionsInRead(
+ CaseInsensitiveMap(options.asScala.toMap),
+ sparkSession.sessionState.conf.sessionLocalTimeZone,
+ sparkSession.sessionState.conf.columnNameOfCorruptRecord
+ )
+
+ // Check a field requirement for corrupt records here to throw an
exception in a driver side
+ ExprUtils.verifyColumnNameOfCorruptRecord(
+ dataSchema,
+ parsedOptions.columnNameOfCorruptRecord
+ )
+ // Don't push any filter which refers to the "virtual" column which cannot
present in the input.
+ // Such filters will be applied later on the upper layer.
+ val actualFilters =
+ pushedFilters.filterNot(
+ _.references.contains(parsedOptions.columnNameOfCorruptRecord)
+ )
+
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ // Hadoop Configurations are case sensitive.
+ val hadoopConf =
+ sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+ val broadcastedConf = sparkSession.sparkContext.broadcast(
+ new SerializableConfiguration(hadoopConf)
+ )
+ // The partition values are already truncated in `FileScan.partitions`.
+ // We should use `readPartitionSchema` as the partition schema here.
+ JsonPartitionReaderFactory(
+ sparkSession.sessionState.conf,
+ broadcastedConf,
+ dataSchema,
+ readDataSchema,
+ readPartitionSchema,
+ parsedOptions,
+ actualFilters
+ )
+ }
+
/**
* Override "partitions" of
* org.apache.spark.sql.execution.datasources.v2.FileScan to disable
splitting
@@ -272,6 +316,7 @@ case class GarScan(
case "csv" => super.hashCode()
case "orc" => getClass.hashCode()
case "parquet" => getClass.hashCode()
+ case "json" => super.hashCode()
case _ =>
throw new IllegalArgumentException("Invalid format name: " + formatName)
}
diff --git
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
index 0ae95894..033fbd34 100644
---
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
+++
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
@@ -56,6 +56,7 @@ case class GarScanBuilder(
override def pushedFilters(): Array[Filter] = formatName match {
case "csv" => Array.empty[Filter]
+ case "json" => Array.empty[Filter]
case "orc" => pushedOrcFilters
case "parquet" => pushedParquetFilters
case _ =>
@@ -87,8 +88,9 @@ case class GarScanBuilder(
// Check if the file format supports nested schema pruning.
override protected val supportsNestedSchemaPruning: Boolean =
formatName match {
- case "csv" => false
- case "orc" => sparkSession.sessionState.conf.nestedSchemaPruningEnabled
+ case "csv" => false
+ case "json" => false
+ case "orc" => sparkSession.sessionState.conf.nestedSchemaPruningEnabled
case "parquet" =>
sparkSession.sessionState.conf.nestedSchemaPruningEnabled
case _ =>
diff --git
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
index acf4943c..df874ea3 100644
---
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
+++
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
@@ -31,8 +31,11 @@ import
org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.graphar.csv.CSVWriteBuilder
import org.apache.spark.sql.graphar.orc.OrcWriteBuilder
import org.apache.spark.sql.graphar.parquet.ParquetWriteBuilder
+import org.apache.spark.sql.graphar.json.JSONWriteBuilder
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
+import org.apache.spark.sql.catalyst.json.JSONOptions
import scala.collection.JavaConverters._
@@ -82,8 +85,21 @@ case class GarTable(
OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap)
case "parquet" =>
ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files)
+ case "json" => {
+ val parsedOptions = new JSONOptions(
+ options.asScala.toMap,
+ sparkSession.sessionState.conf.sessionLocalTimeZone
+ )
+
+ JsonDataSource(parsedOptions).inferSchema(
+ sparkSession,
+ files,
+ parsedOptions
+ )
+ }
case _ =>
throw new IllegalArgumentException("Invalid format name: " +
formatName)
+
}
/** Construct a new write builder according to the actual file format. */
@@ -95,6 +111,8 @@ case class GarTable(
new OrcWriteBuilder(paths, formatName, supportsDataType, info)
case "parquet" =>
new ParquetWriteBuilder(paths, formatName, supportsDataType, info)
+ case "json" =>
+ new JSONWriteBuilder(paths, formatName, supportsDataType, info)
case _ =>
throw new IllegalArgumentException("Invalid format name: " +
formatName)
}
diff --git
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/json/JSONWriteBuilder.scala
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/json/JSONWriteBuilder.scala
new file mode 100644
index 00000000..150a9a9f
--- /dev/null
+++
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/json/JSONWriteBuilder.scala
@@ -0,0 +1,73 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Derived from Apache Spark 3.5.1
+//
https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala
+
+package org.apache.spark.sql.graphar.json
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.connector.write.LogicalWriteInfo
+import org.apache.spark.sql.execution.datasources.json.JsonOutputWriter
+import org.apache.spark.sql.execution.datasources.{
+ CodecStreams,
+ OutputWriter,
+ OutputWriterFactory
+}
+
+import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{StructType, DataType}
+
+import org.apache.spark.sql.graphar.GarWriteBuilder
+
+class JSONWriteBuilder(
+ paths: Seq[String],
+ formatName: String,
+ supportsDataType: DataType => Boolean,
+ info: LogicalWriteInfo
+) extends GarWriteBuilder(paths, formatName, supportsDataType, info) {
+ override def prepareWrite(
+ sqlConf: SQLConf,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType
+ ): OutputWriterFactory = {
+ val conf = job.getConfiguration
+ val parsedOptions = new JSONOptions(
+ options,
+ sqlConf.sessionLocalTimeZone,
+ sqlConf.columnNameOfCorruptRecord
+ )
+ parsedOptions.compressionCodec.foreach { codec =>
+ CompressionCodecs.setCodecConfiguration(conf, codec)
+ }
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext
+ ): OutputWriter = {
+ new JsonOutputWriter(path, parsedOptions, dataSchema, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ ".json" + CodecStreams.getCompressionExtension(context)
+ }
+ }
+ }
+}
diff --git
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
index b6094914..e502f82c 100644
---
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
+++
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.sources.DataSourceRegister
@@ -173,6 +174,7 @@ class GarDataSource extends TableProvider with
DataSourceRegister {
case "csv" => classOf[CSVFileFormat]
case "orc" => classOf[OrcFileFormat]
case "parquet" => classOf[ParquetFileFormat]
+ case "json" => classOf[JsonFileFormat]
case _ => throw new IllegalArgumentException
}
}
diff --git
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
index feaa7e56..a4d5207b 100644
---
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
+++
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
@@ -24,7 +24,9 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.csv.CSVOptions
+import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, Expression}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{
@@ -39,6 +41,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{
}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import
org.apache.spark.sql.execution.datasources.v2.csv.CSVPartitionReaderFactory
+import
org.apache.spark.sql.execution.datasources.v2.json.JsonPartitionReaderFactory
import
org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory
import
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
import org.apache.spark.sql.internal.SQLConf
@@ -74,6 +77,7 @@ case class GarScan(
case "csv" => createCSVReaderFactory()
case "orc" => createOrcReaderFactory()
case "parquet" => createParquetReaderFactory()
+ case "json" => createJSONReaderFactory()
case _ =>
throw new IllegalArgumentException("Invalid format name: " +
formatName)
}
@@ -203,6 +207,45 @@ case class GarScan(
)
}
+ // Create the reader factory for the JSON format.
+ private def createJSONReaderFactory(): PartitionReaderFactory = {
+ val parsedOptions = new JSONOptionsInRead(
+ CaseInsensitiveMap(options.asScala.toMap),
+ sparkSession.sessionState.conf.sessionLocalTimeZone,
+ sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+
+ // Check a field requirement for corrupt records here to throw an
exception in a driver side
+ ExprUtils.verifyColumnNameOfCorruptRecord(
+ dataSchema,
+ parsedOptions.columnNameOfCorruptRecord
+ )
+ // Don't push any filter which refers to the "virtual" column which cannot
present in the input.
+ // Such filters will be applied later on the upper layer.
+ val actualFilters =
+ pushedFilters.filterNot(
+ _.references.contains(parsedOptions.columnNameOfCorruptRecord)
+ )
+
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ // Hadoop Configurations are case sensitive.
+ val hadoopConf =
+ sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+ val broadcastedConf = sparkSession.sparkContext.broadcast(
+ new SerializableConfiguration(hadoopConf)
+ )
+ // The partition values are already truncated in `FileScan.partitions`.
+ // We should use `readPartitionSchema` as the partition schema here.
+ JsonPartitionReaderFactory(
+ sparkSession.sessionState.conf,
+ broadcastedConf,
+ dataSchema,
+ readDataSchema,
+ readPartitionSchema,
+ parsedOptions,
+ actualFilters
+ )
+ }
+
/**
* Override "partitions" of
* org.apache.spark.sql.execution.datasources.v2.FileScan to disable
splitting
@@ -280,6 +323,7 @@ case class GarScan(
/** Get the hash code of the object. */
override def hashCode(): Int = formatName match {
case "csv" => super.hashCode()
+ case "json" => super.hashCode()
case "orc" => getClass.hashCode()
case "parquet" => getClass.hashCode()
case _ =>
diff --git
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
index 94fe5752..3b2ca60e 100644
---
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
+++
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
@@ -52,6 +52,7 @@ case class GarScanBuilder(
this.filters = dataFilters
formatName match {
case "csv" => Array.empty[Filter]
+ case "json" => Array.empty[Filter]
case "orc" => pushedOrcFilters
case "parquet" => pushedParquetFilters
case _ =>
@@ -84,6 +85,7 @@ case class GarScanBuilder(
override protected val supportsNestedSchemaPruning: Boolean =
formatName match {
case "csv" => false
+ case "json" => false
case "orc" => sparkSession.sessionState.conf.nestedSchemaPruningEnabled
case "parquet" =>
sparkSession.sessionState.conf.nestedSchemaPruningEnabled
diff --git
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
index acf4943c..e24e9051 100644
---
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
+++
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
@@ -31,8 +31,11 @@ import
org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.graphar.csv.CSVWriteBuilder
import org.apache.spark.sql.graphar.orc.OrcWriteBuilder
import org.apache.spark.sql.graphar.parquet.ParquetWriteBuilder
+import org.apache.spark.sql.graphar.json.JSONWriteBuilder
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
+import org.apache.spark.sql.catalyst.json.JSONOptions
import scala.collection.JavaConverters._
@@ -82,8 +85,21 @@ case class GarTable(
OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap)
case "parquet" =>
ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files)
+ case "json" => {
+ val parsedOptions = new JSONOptions(
+ options.asScala.toMap,
+ sparkSession.sessionState.conf.sessionLocalTimeZone
+ )
+
+ JsonDataSource(parsedOptions).inferSchema(
+ sparkSession,
+ files,
+ parsedOptions
+ )
+ }
case _ =>
throw new IllegalArgumentException("Invalid format name: " +
formatName)
+
}
/** Construct a new write builder according to the actual file format. */
@@ -95,6 +111,8 @@ case class GarTable(
new OrcWriteBuilder(paths, formatName, supportsDataType, info)
case "parquet" =>
new ParquetWriteBuilder(paths, formatName, supportsDataType, info)
+ case "json" =>
+ new JSONWriteBuilder(paths, formatName, supportsDataType, info)
case _ =>
throw new IllegalArgumentException("Invalid format name: " +
formatName)
}
diff --git
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/json/JSONWriteBuilder.scala
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/json/JSONWriteBuilder.scala
new file mode 100644
index 00000000..150a9a9f
--- /dev/null
+++
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/json/JSONWriteBuilder.scala
@@ -0,0 +1,73 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Derived from Apache Spark 3.5.1
+//
https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala
+
+package org.apache.spark.sql.graphar.json
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.connector.write.LogicalWriteInfo
+import org.apache.spark.sql.execution.datasources.json.JsonOutputWriter
+import org.apache.spark.sql.execution.datasources.{
+ CodecStreams,
+ OutputWriter,
+ OutputWriterFactory
+}
+
+import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{StructType, DataType}
+
+import org.apache.spark.sql.graphar.GarWriteBuilder
+
+class JSONWriteBuilder(
+ paths: Seq[String],
+ formatName: String,
+ supportsDataType: DataType => Boolean,
+ info: LogicalWriteInfo
+) extends GarWriteBuilder(paths, formatName, supportsDataType, info) {
+ override def prepareWrite(
+ sqlConf: SQLConf,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType
+ ): OutputWriterFactory = {
+ val conf = job.getConfiguration
+ val parsedOptions = new JSONOptions(
+ options,
+ sqlConf.sessionLocalTimeZone,
+ sqlConf.columnNameOfCorruptRecord
+ )
+ parsedOptions.compressionCodec.foreach { codec =>
+ CompressionCodecs.setCodecConfiguration(conf, codec)
+ }
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext
+ ): OutputWriter = {
+ new JsonOutputWriter(path, parsedOptions, dataSchema, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ ".json" + CodecStreams.getCompressionExtension(context)
+ }
+ }
+ }
+}
diff --git
a/maven-projects/spark/graphar/src/main/scala/org/apache/graphar/GraphInfo.scala
b/maven-projects/spark/graphar/src/main/scala/org/apache/graphar/GraphInfo.scala
index 36804617..5c410d65 100644
---
a/maven-projects/spark/graphar/src/main/scala/org/apache/graphar/GraphInfo.scala
+++
b/maven-projects/spark/graphar/src/main/scala/org/apache/graphar/GraphInfo.scala
@@ -96,6 +96,7 @@ object FileType extends Enumeration {
val CSV = Value(0)
val PARQUET = Value(1)
val ORC = Value(2)
+ val JSON = Value(3)
/**
* File type to string.
@@ -109,6 +110,7 @@ object FileType extends Enumeration {
case FileType.CSV => "csv"
case FileType.PARQUET => "parquet"
case FileType.ORC => "orc"
+ case FileType.JSON => "json"
case _ => throw new IllegalArgumentException("Unknown file type")
}
@@ -124,6 +126,7 @@ object FileType extends Enumeration {
case "csv" => FileType.CSV
case "parquet" => FileType.PARQUET
case "orc" => FileType.ORC
+ case "json" => FileType.JSON
case _ => throw new IllegalArgumentException("Unknown file type: " + str)
}
diff --git
a/maven-projects/spark/graphar/src/test/scala/org/apache/graphar/TestReader.scala
b/maven-projects/spark/graphar/src/test/scala/org/apache/graphar/TestReader.scala
index 2aafb29d..f61710b9 100644
---
a/maven-projects/spark/graphar/src/test/scala/org/apache/graphar/TestReader.scala
+++
b/maven-projects/spark/graphar/src/test/scala/org/apache/graphar/TestReader.scala
@@ -191,6 +191,25 @@ class ReaderSuite extends BaseTestSuite {
)
}
+ test("json test") {
+ // construct the vertex information
+ val prefix = testData + "/ldbc_sample/json/"
+ val vertex_yaml = prefix + "Person.vertex.yml"
+ val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark)
+
+ // construct the vertex reader
+ val reader = new VertexReader(prefix, vertex_info, spark)
+
+ // test reading the number of vertices
+ assert(reader.readVerticesNumber() == 903)
+ val property_group = vertex_info.getPropertyGroup("gender")
+
+ // test reading a single property chunk
+ val single_chunk_df = reader.readVertexPropertyChunk(property_group, 0)
+ assert(single_chunk_df.columns.length == 5)
+ assert(single_chunk_df.count() == 100)
+ }
+
test("read edge chunks") {
// construct the edge information
val prefix = testData + "/ldbc_sample/csv/"
diff --git a/pyspark/graphar_pyspark/enums.py b/pyspark/graphar_pyspark/enums.py
index 1e629026..54094d00 100644
--- a/pyspark/graphar_pyspark/enums.py
+++ b/pyspark/graphar_pyspark/enums.py
@@ -58,6 +58,7 @@ class FileType(Enum):
"""Type of file format."""
CSV = "csv"
+ JSON = "json"
PARQUET = "parquet"
ORC = "orc"
diff --git a/pyspark/graphar_pyspark/graph.py b/pyspark/graphar_pyspark/graph.py
index 1c8f8043..eeab184c 100644
--- a/pyspark/graphar_pyspark/graph.py
+++ b/pyspark/graphar_pyspark/graph.py
@@ -195,7 +195,7 @@ class GraphWriter:
:param name: the name of graph, default is 'grpah'
:param vertex_chunk_size: the chunk size for vertices, default is 2^18
:param edge_chunk_size: the chunk size for edges, default is 2^22
- :param file_type: the file type for data payload file, support
[parquet, orc, csv], default is parquet.
+ :param file_type: the file type for data payload file, support
[parquet, orc, csv, json], default is parquet.
:param version: version of GraphAr format, default is v1.
"""
if vertex_chunk_size is None:
diff --git a/pyspark/tests/test_reader.py b/pyspark/tests/test_reader.py
index a92a399f..9736c508 100644
--- a/pyspark/tests/test_reader.py
+++ b/pyspark/tests/test_reader.py
@@ -100,6 +100,45 @@ def test_edge_reader(spark):
assert edge_reader.read_edges_number(0) == 0
assert edge_reader.read_offset(0).count() > 0
+def test_vertex_reader_with_json(spark):
+ initialize(spark)
+
+ vertex_info = VertexInfo.load_vertex_info(
+ GRAPHAR_TESTS_EXAMPLES.joinpath("ldbc_sample/json")
+ .joinpath("Person.vertex.yml")
+ .absolute()
+ .__str__()
+ )
+ vertex_reader = VertexReader.from_python(
+
GRAPHAR_TESTS_EXAMPLES.joinpath("ldbc_sample/json/").absolute().__str__(),
+ vertex_info,
+ )
+ assert VertexReader.from_scala(vertex_reader.to_scala()) is not None
+ assert vertex_reader.read_vertices_number() > 0
+ assert (
+ vertex_reader.read_vertex_property_group(
+ vertex_info.get_property_group("firstName")
+ ).count()
+ > 0
+ )
+ assert (
+ vertex_reader.read_vertex_property_chunk(
+ vertex_info.get_property_groups()[0], 0
+ ).count()
+ > 0
+ )
+ assert (
+ vertex_reader.read_all_vertex_property_groups().count()
+ >= vertex_reader.read_vertex_property_group(
+ vertex_info.get_property_group("lastName")
+ ).count()
+ )
+ assert (
+ vertex_reader.read_multiple_vertex_property_groups(
+ [vertex_info.get_property_group("gender")]
+ ).count()
+ > 0
+ )
def test_graph_reader(spark):
initialize(spark)
diff --git a/testing b/testing
index 5066ecba..5419fc79 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 5066ecba2ae20e4540d4199679da7957f85adf4c
+Subproject commit 5419fc79c43c7dc37d2b3288da6ea319b44a7042
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]