This is an automated email from the ASF dual-hosted git repository.

weibin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-graphar.git


The following commit(s) were added to refs/heads/main by this push:
     new 73e0702c feat(c++,spark,pyspark): support json payload file format 
(#518)
73e0702c is described below

commit 73e0702c9797878444e927401ddbe0f1f4ae1736
Author: amygbAI <[email protected]>
AuthorDate: Tue Jun 25 07:43:11 2024 +0530

    feat(c++,spark,pyspark): support json payload file format (#518)
    
    Reason for this PR
    This is a rebased pull request, so to speak of ( due to the issue with 
JSONOptions being private in spark 3.2 and 3.3 ) ..also added test artifacts 
separately in incubator-graphar-testing. Was able to compile with spark 3.2 and 
3.3 ( in both datasources-32 and datasources-33 )
    
    What changes are included in this PR?
    Changes in datasources-32 and datasources-33
    Changes in cpp
    Changes in pyspark
    
    Are these changes tested?
    yes, added the generated ldbc samples in 
incubator-graphar-testing/ldbc_sample/json
    
    Are there any user-facing changes?
    
    None
    
    ---------
    
    Signed-off-by: amygbAI <[email protected]>
    Signed-off-by: acezen <[email protected]>
    Co-authored-by: Ubuntu <[email protected]>
    Co-authored-by: acezen <[email protected]>
---
 cpp/include/graphar/fwd.h                          |  2 +-
 cpp/include/graphar/util/file_type.h               |  2 +
 cpp/src/filesystem.cc                              |  2 +
 cpp/test/test_arrow_chunk_reader.cc                | 64 ++++++++++++++++++-
 .../apache/graphar/datasources/GarDataSource.scala |  2 +
 .../org/apache/spark/sql/graphar/GarScan.scala     | 45 +++++++++++++
 .../apache/spark/sql/graphar/GarScanBuilder.scala  |  6 +-
 .../org/apache/spark/sql/graphar/GarTable.scala    | 18 ++++++
 .../spark/sql/graphar/json/JSONWriteBuilder.scala  | 73 ++++++++++++++++++++++
 .../apache/graphar/datasources/GarDataSource.scala |  2 +
 .../org/apache/spark/sql/graphar/GarScan.scala     | 44 +++++++++++++
 .../apache/spark/sql/graphar/GarScanBuilder.scala  |  2 +
 .../org/apache/spark/sql/graphar/GarTable.scala    | 18 ++++++
 .../spark/sql/graphar/json/JSONWriteBuilder.scala  | 73 ++++++++++++++++++++++
 .../main/scala/org/apache/graphar/GraphInfo.scala  |  3 +
 .../test/scala/org/apache/graphar/TestReader.scala | 19 ++++++
 pyspark/graphar_pyspark/enums.py                   |  1 +
 pyspark/graphar_pyspark/graph.py                   |  2 +-
 pyspark/tests/test_reader.py                       | 39 ++++++++++++
 testing                                            |  2 +-
 20 files changed, 413 insertions(+), 6 deletions(-)

diff --git a/cpp/include/graphar/fwd.h b/cpp/include/graphar/fwd.h
index acc4ce06..d053a165 100644
--- a/cpp/include/graphar/fwd.h
+++ b/cpp/include/graphar/fwd.h
@@ -73,7 +73,7 @@ using IdType = int64_t;
 enum class Type;
 class DataType;
 /** Type of file format */
-enum FileType { CSV = 0, PARQUET = 1, ORC = 2 };
+enum FileType { CSV = 0, PARQUET = 1, ORC = 2, JSON = 3 };
 enum class AdjListType : uint8_t;
 
 template <typename T>
diff --git a/cpp/include/graphar/util/file_type.h 
b/cpp/include/graphar/util/file_type.h
index f1d08daa..7fc218ed 100644
--- a/cpp/include/graphar/util/file_type.h
+++ b/cpp/include/graphar/util/file_type.h
@@ -31,6 +31,7 @@ namespace graphar {
 static inline FileType StringToFileType(const std::string& str) {
   static const std::map<std::string, FileType> str2file_type{
       {"csv", FileType::CSV},
+      {"json", FileType::JSON},
       {"parquet", FileType::PARQUET},
       {"orc", FileType::ORC}};
   try {
@@ -43,6 +44,7 @@ static inline FileType StringToFileType(const std::string& 
str) {
 static inline const char* FileTypeToString(FileType file_type) {
   static const std::map<FileType, const char*> file_type2string{
       {FileType::CSV, "csv"},
+      {FileType::JSON, "json"},
       {FileType::PARQUET, "parquet"},
       {FileType::ORC, "orc"}};
   return file_type2string.at(file_type);
diff --git a/cpp/src/filesystem.cc b/cpp/src/filesystem.cc
index bc78108a..23b9ec0b 100644
--- a/cpp/src/filesystem.cc
+++ b/cpp/src/filesystem.cc
@@ -91,6 +91,8 @@ std::shared_ptr<ds::FileFormat> FileSystem::GetFileFormat(
     return std::make_shared<ds::ParquetFileFormat>();
   case ORC:
     return std::make_shared<ds::OrcFileFormat>();
+  case JSON:
+    return std::make_shared<ds::JsonFileFormat>();
   default:
     return nullptr;
   }
diff --git a/cpp/test/test_arrow_chunk_reader.cc 
b/cpp/test/test_arrow_chunk_reader.cc
index 36608d64..cc5b3e99 100644
--- a/cpp/test/test_arrow_chunk_reader.cc
+++ b/cpp/test/test_arrow_chunk_reader.cc
@@ -291,7 +291,6 @@ TEST_CASE_METHOD(GlobalFixture, "ArrowChunkReader") {
       auto maybe_reader = AdjListArrowChunkReader::Make(
           graph_info, src_label, edge_label, dst_label,
           AdjListType::ordered_by_source);
-      REQUIRE(maybe_reader.status().ok());
       auto reader = maybe_reader.value();
       // check reader start from vertex chunk 0
       auto result = reader->GetChunk();
@@ -463,4 +462,67 @@ TEST_CASE_METHOD(GlobalFixture, "ArrowChunkReader") {
     REQUIRE(reader->seek(1024).IsIndexError());
   }
 }
+
+TEST_CASE_METHOD(GlobalFixture, "JSON_TEST") {
+  // read file and construct graph info
+  std::string path = test_data_dir + "/ldbc_sample/json/LdbcSample.graph.yml";
+  std::string src_label = "Person", edge_label = "Knows", dst_label = "Person";
+  std::string vertex_property_name = "id";
+  std::string edge_property_name = "creationDate";
+  auto maybe_graph_info = GraphInfo::Load(path);
+  REQUIRE(maybe_graph_info.status().ok());
+  auto graph_info = maybe_graph_info.value();
+  auto vertex_info = graph_info->GetVertexInfo(src_label);
+  REQUIRE(vertex_info != nullptr);
+  auto v_pg = vertex_info->GetPropertyGroup(vertex_property_name);
+  REQUIRE(v_pg != nullptr);
+  auto edge_info = graph_info->GetEdgeInfo(src_label, edge_label, dst_label);
+  REQUIRE(edge_info != nullptr);
+  auto e_pg = edge_info->GetPropertyGroup(edge_property_name);
+  REQUIRE(e_pg != nullptr);
+
+  SECTION("VertexPropertyArrowChunkReader") {
+    auto maybe_reader = VertexPropertyArrowChunkReader::Make(
+        graph_info, src_label, vertex_property_name);
+    REQUIRE(maybe_reader.status().ok());
+    auto reader = maybe_reader.value();
+    REQUIRE(reader->GetChunkNum() == 10);
+
+    SECTION("Basics") {
+      auto result = reader->GetChunk();
+      REQUIRE(!result.has_error());
+      auto table = result.value();
+      REQUIRE(table->num_rows() == 100);
+      REQUIRE(table->GetColumnByName(GeneralParams::kVertexIndexCol) !=
+              nullptr);
+
+      // seek
+      REQUIRE(reader->seek(100).ok());
+      result = reader->GetChunk();
+      REQUIRE(!result.has_error());
+      table = result.value();
+      REQUIRE(table->num_rows() == 100);
+      REQUIRE(table->GetColumnByName(GeneralParams::kVertexIndexCol) !=
+              nullptr);
+      REQUIRE(reader->next_chunk().ok());
+      result = reader->GetChunk();
+      REQUIRE(!result.has_error());
+      table = result.value();
+      REQUIRE(table->num_rows() == 100);
+      REQUIRE(table->GetColumnByName(GeneralParams::kVertexIndexCol) !=
+              nullptr);
+      REQUIRE(reader->seek(900).ok());
+      result = reader->GetChunk();
+      REQUIRE(!result.has_error());
+      table = result.value();
+      REQUIRE(table->num_rows() == 3);
+      REQUIRE(table->GetColumnByName(GeneralParams::kVertexIndexCol) !=
+              nullptr);
+      REQUIRE(reader->GetChunkNum() == 10);
+      REQUIRE(reader->next_chunk().IsIndexError());
+
+      REQUIRE(reader->seek(1024).IsIndexError());
+    }
+  }
+}
 }  // namespace graphar
diff --git 
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
 
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
index 7424ad68..eac30887 100644
--- 
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
+++ 
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.graphar.GarTable
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types.StructType
@@ -174,6 +175,7 @@ class GarDataSource extends TableProvider with 
DataSourceRegister {
     case "csv"     => classOf[CSVFileFormat]
     case "orc"     => classOf[OrcFileFormat]
     case "parquet" => classOf[ParquetFileFormat]
+    case "json"    => classOf[JsonFileFormat]
     case _         => throw new IllegalArgumentException
   }
 }
diff --git 
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
 
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
index b6027f8a..804f0f51 100644
--- 
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
+++ 
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
@@ -24,7 +24,9 @@ import org.apache.hadoop.fs.Path
 import org.apache.parquet.hadoop.ParquetInputFormat
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.csv.CSVOptions
+import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
 import org.apache.spark.sql.catalyst.expressions.{ExprUtils, Expression}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.read.PartitionReaderFactory
 import org.apache.spark.sql.execution.PartitionedFileUtil
 import org.apache.spark.sql.execution.datasources.parquet.{
@@ -34,6 +36,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{
 }
 import org.apache.spark.sql.execution.datasources.v2.FileScan
 import 
org.apache.spark.sql.execution.datasources.v2.csv.CSVPartitionReaderFactory
+import 
org.apache.spark.sql.execution.datasources.v2.json.JsonPartitionReaderFactory
 import 
org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory
 import 
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
 import org.apache.spark.sql.execution.datasources.{
@@ -74,6 +77,7 @@ case class GarScan(
       case "csv"     => createCSVReaderFactory()
       case "orc"     => createOrcReaderFactory()
       case "parquet" => createParquetReaderFactory()
+      case "json"    => createJSONReaderFactory()
       case _ =>
         throw new IllegalArgumentException("Invalid format name: " + 
formatName)
     }
@@ -193,6 +197,46 @@ case class GarScan(
     )
   }
 
+  // Create the reader factory for the JSON format.
+  private def createJSONReaderFactory(): PartitionReaderFactory = {
+    val parsedOptions = new JSONOptionsInRead(
+      CaseInsensitiveMap(options.asScala.toMap),
+      sparkSession.sessionState.conf.sessionLocalTimeZone,
+      sparkSession.sessionState.conf.columnNameOfCorruptRecord
+    )
+
+    // Check a field requirement for corrupt records here to throw an 
exception in a driver side
+    ExprUtils.verifyColumnNameOfCorruptRecord(
+      dataSchema,
+      parsedOptions.columnNameOfCorruptRecord
+    )
+    // Don't push any filter which refers to the "virtual" column which cannot 
present in the input.
+    // Such filters will be applied later on the upper layer.
+    val actualFilters =
+      pushedFilters.filterNot(
+        _.references.contains(parsedOptions.columnNameOfCorruptRecord)
+      )
+
+    val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+    // Hadoop Configurations are case sensitive.
+    val hadoopConf =
+      sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+    val broadcastedConf = sparkSession.sparkContext.broadcast(
+      new SerializableConfiguration(hadoopConf)
+    )
+    // The partition values are already truncated in `FileScan.partitions`.
+    // We should use `readPartitionSchema` as the partition schema here.
+    JsonPartitionReaderFactory(
+      sparkSession.sessionState.conf,
+      broadcastedConf,
+      dataSchema,
+      readDataSchema,
+      readPartitionSchema,
+      parsedOptions,
+      actualFilters
+    )
+  }
+
   /**
    * Override "partitions" of
    * org.apache.spark.sql.execution.datasources.v2.FileScan to disable 
splitting
@@ -272,6 +316,7 @@ case class GarScan(
     case "csv"     => super.hashCode()
     case "orc"     => getClass.hashCode()
     case "parquet" => getClass.hashCode()
+    case "json"    => super.hashCode()
     case _ =>
       throw new IllegalArgumentException("Invalid format name: " + formatName)
   }
diff --git 
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
 
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
index 0ae95894..033fbd34 100644
--- 
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
+++ 
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
@@ -56,6 +56,7 @@ case class GarScanBuilder(
 
   override def pushedFilters(): Array[Filter] = formatName match {
     case "csv"     => Array.empty[Filter]
+    case "json"    => Array.empty[Filter]
     case "orc"     => pushedOrcFilters
     case "parquet" => pushedParquetFilters
     case _ =>
@@ -87,8 +88,9 @@ case class GarScanBuilder(
   // Check if the file format supports nested schema pruning.
   override protected val supportsNestedSchemaPruning: Boolean =
     formatName match {
-      case "csv" => false
-      case "orc" => sparkSession.sessionState.conf.nestedSchemaPruningEnabled
+      case "csv"  => false
+      case "json" => false
+      case "orc"  => sparkSession.sessionState.conf.nestedSchemaPruningEnabled
       case "parquet" =>
         sparkSession.sessionState.conf.nestedSchemaPruningEnabled
       case _ =>
diff --git 
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
 
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
index acf4943c..df874ea3 100644
--- 
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
+++ 
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
@@ -31,8 +31,11 @@ import 
org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.graphar.csv.CSVWriteBuilder
 import org.apache.spark.sql.graphar.orc.OrcWriteBuilder
 import org.apache.spark.sql.graphar.parquet.ParquetWriteBuilder
+import org.apache.spark.sql.graphar.json.JSONWriteBuilder
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
+import org.apache.spark.sql.catalyst.json.JSONOptions
 
 import scala.collection.JavaConverters._
 
@@ -82,8 +85,21 @@ case class GarTable(
         OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap)
       case "parquet" =>
         ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files)
+      case "json" => {
+        val parsedOptions = new JSONOptions(
+          options.asScala.toMap,
+          sparkSession.sessionState.conf.sessionLocalTimeZone
+        )
+
+        JsonDataSource(parsedOptions).inferSchema(
+          sparkSession,
+          files,
+          parsedOptions
+        )
+      }
       case _ =>
         throw new IllegalArgumentException("Invalid format name: " + 
formatName)
+
     }
 
   /** Construct a new write builder according to the actual file format. */
@@ -95,6 +111,8 @@ case class GarTable(
         new OrcWriteBuilder(paths, formatName, supportsDataType, info)
       case "parquet" =>
         new ParquetWriteBuilder(paths, formatName, supportsDataType, info)
+      case "json" =>
+        new JSONWriteBuilder(paths, formatName, supportsDataType, info)
       case _ =>
         throw new IllegalArgumentException("Invalid format name: " + 
formatName)
     }
diff --git 
a/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/json/JSONWriteBuilder.scala
 
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/json/JSONWriteBuilder.scala
new file mode 100644
index 00000000..150a9a9f
--- /dev/null
+++ 
b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/json/JSONWriteBuilder.scala
@@ -0,0 +1,73 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Derived from Apache Spark 3.5.1
+// 
https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala
+
+package org.apache.spark.sql.graphar.json
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.connector.write.LogicalWriteInfo
+import org.apache.spark.sql.execution.datasources.json.JsonOutputWriter
+import org.apache.spark.sql.execution.datasources.{
+  CodecStreams,
+  OutputWriter,
+  OutputWriterFactory
+}
+
+import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{StructType, DataType}
+
+import org.apache.spark.sql.graphar.GarWriteBuilder
+
+class JSONWriteBuilder(
+    paths: Seq[String],
+    formatName: String,
+    supportsDataType: DataType => Boolean,
+    info: LogicalWriteInfo
+) extends GarWriteBuilder(paths, formatName, supportsDataType, info) {
+  override def prepareWrite(
+      sqlConf: SQLConf,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType
+  ): OutputWriterFactory = {
+    val conf = job.getConfiguration
+    val parsedOptions = new JSONOptions(
+      options,
+      sqlConf.sessionLocalTimeZone,
+      sqlConf.columnNameOfCorruptRecord
+    )
+    parsedOptions.compressionCodec.foreach { codec =>
+      CompressionCodecs.setCodecConfiguration(conf, codec)
+    }
+
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String,
+          dataSchema: StructType,
+          context: TaskAttemptContext
+      ): OutputWriter = {
+        new JsonOutputWriter(path, parsedOptions, dataSchema, context)
+      }
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        ".json" + CodecStreams.getCompressionExtension(context)
+      }
+    }
+  }
+}
diff --git 
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
 
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
index b6094914..e502f82c 100644
--- 
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
+++ 
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.sql.sources.DataSourceRegister
@@ -173,6 +174,7 @@ class GarDataSource extends TableProvider with 
DataSourceRegister {
     case "csv"     => classOf[CSVFileFormat]
     case "orc"     => classOf[OrcFileFormat]
     case "parquet" => classOf[ParquetFileFormat]
+    case "json"    => classOf[JsonFileFormat]
     case _         => throw new IllegalArgumentException
   }
 }
diff --git 
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
 
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
index feaa7e56..a4d5207b 100644
--- 
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
+++ 
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
@@ -24,7 +24,9 @@ import org.apache.hadoop.fs.Path
 import org.apache.parquet.hadoop.ParquetInputFormat
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.csv.CSVOptions
+import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
 import org.apache.spark.sql.catalyst.expressions.{ExprUtils, Expression}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.read.PartitionReaderFactory
 import org.apache.spark.sql.execution.PartitionedFileUtil
 import org.apache.spark.sql.execution.datasources.{
@@ -39,6 +41,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{
 }
 import org.apache.spark.sql.execution.datasources.v2.FileScan
 import 
org.apache.spark.sql.execution.datasources.v2.csv.CSVPartitionReaderFactory
+import 
org.apache.spark.sql.execution.datasources.v2.json.JsonPartitionReaderFactory
 import 
org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory
 import 
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
 import org.apache.spark.sql.internal.SQLConf
@@ -74,6 +77,7 @@ case class GarScan(
       case "csv"     => createCSVReaderFactory()
       case "orc"     => createOrcReaderFactory()
       case "parquet" => createParquetReaderFactory()
+      case "json"    => createJSONReaderFactory()
       case _ =>
         throw new IllegalArgumentException("Invalid format name: " + 
formatName)
     }
@@ -203,6 +207,45 @@ case class GarScan(
     )
   }
 
+  // Create the reader factory for the JSON format.
+  private def createJSONReaderFactory(): PartitionReaderFactory = {
+    val parsedOptions = new JSONOptionsInRead(
+      CaseInsensitiveMap(options.asScala.toMap),
+      sparkSession.sessionState.conf.sessionLocalTimeZone,
+      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+
+    // Check a field requirement for corrupt records here to throw an 
exception in a driver side
+    ExprUtils.verifyColumnNameOfCorruptRecord(
+      dataSchema,
+      parsedOptions.columnNameOfCorruptRecord
+    )
+    // Don't push any filter which refers to the "virtual" column which cannot 
present in the input.
+    // Such filters will be applied later on the upper layer.
+    val actualFilters =
+      pushedFilters.filterNot(
+        _.references.contains(parsedOptions.columnNameOfCorruptRecord)
+      )
+
+    val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+    // Hadoop Configurations are case sensitive.
+    val hadoopConf =
+      sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+    val broadcastedConf = sparkSession.sparkContext.broadcast(
+      new SerializableConfiguration(hadoopConf)
+    )
+    // The partition values are already truncated in `FileScan.partitions`.
+    // We should use `readPartitionSchema` as the partition schema here.
+    JsonPartitionReaderFactory(
+      sparkSession.sessionState.conf,
+      broadcastedConf,
+      dataSchema,
+      readDataSchema,
+      readPartitionSchema,
+      parsedOptions,
+      actualFilters
+    )
+  }
+
   /**
    * Override "partitions" of
    * org.apache.spark.sql.execution.datasources.v2.FileScan to disable 
splitting
@@ -280,6 +323,7 @@ case class GarScan(
   /** Get the hash code of the object. */
   override def hashCode(): Int = formatName match {
     case "csv"     => super.hashCode()
+    case "json"    => super.hashCode()
     case "orc"     => getClass.hashCode()
     case "parquet" => getClass.hashCode()
     case _ =>
diff --git 
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
 
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
index 94fe5752..3b2ca60e 100644
--- 
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
+++ 
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
@@ -52,6 +52,7 @@ case class GarScanBuilder(
     this.filters = dataFilters
     formatName match {
       case "csv"     => Array.empty[Filter]
+      case "json"     => Array.empty[Filter]
       case "orc"     => pushedOrcFilters
       case "parquet" => pushedParquetFilters
       case _ =>
@@ -84,6 +85,7 @@ case class GarScanBuilder(
   override protected val supportsNestedSchemaPruning: Boolean =
     formatName match {
       case "csv" => false
+      case "json" => false
       case "orc" => sparkSession.sessionState.conf.nestedSchemaPruningEnabled
       case "parquet" =>
         sparkSession.sessionState.conf.nestedSchemaPruningEnabled
diff --git 
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
 
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
index acf4943c..e24e9051 100644
--- 
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
+++ 
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
@@ -31,8 +31,11 @@ import 
org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.graphar.csv.CSVWriteBuilder
 import org.apache.spark.sql.graphar.orc.OrcWriteBuilder
 import org.apache.spark.sql.graphar.parquet.ParquetWriteBuilder
+import org.apache.spark.sql.graphar.json.JSONWriteBuilder
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
+import org.apache.spark.sql.catalyst.json.JSONOptions
 
 import scala.collection.JavaConverters._
 
@@ -82,8 +85,21 @@ case class GarTable(
         OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap)
       case "parquet" =>
         ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files)
+      case "json" => {
+         val parsedOptions = new JSONOptions(
+           options.asScala.toMap,
+           sparkSession.sessionState.conf.sessionLocalTimeZone
+         )
+
+         JsonDataSource(parsedOptions).inferSchema(
+           sparkSession,
+           files,
+           parsedOptions
+         )
+      }
       case _ =>
         throw new IllegalArgumentException("Invalid format name: " + 
formatName)
+        
     }
 
   /** Construct a new write builder according to the actual file format. */
@@ -95,6 +111,8 @@ case class GarTable(
         new OrcWriteBuilder(paths, formatName, supportsDataType, info)
       case "parquet" =>
         new ParquetWriteBuilder(paths, formatName, supportsDataType, info)
+      case "json" =>
+        new JSONWriteBuilder(paths, formatName, supportsDataType, info)
       case _ =>
         throw new IllegalArgumentException("Invalid format name: " + 
formatName)
     }
diff --git 
a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/json/JSONWriteBuilder.scala
 
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/json/JSONWriteBuilder.scala
new file mode 100644
index 00000000..150a9a9f
--- /dev/null
+++ 
b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/json/JSONWriteBuilder.scala
@@ -0,0 +1,73 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Derived from Apache Spark 3.5.1
+// 
https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala
+
+package org.apache.spark.sql.graphar.json
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.connector.write.LogicalWriteInfo
+import org.apache.spark.sql.execution.datasources.json.JsonOutputWriter
+import org.apache.spark.sql.execution.datasources.{
+  CodecStreams,
+  OutputWriter,
+  OutputWriterFactory
+}
+
+import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{StructType, DataType}
+
+import org.apache.spark.sql.graphar.GarWriteBuilder
+
+class JSONWriteBuilder(
+    paths: Seq[String],
+    formatName: String,
+    supportsDataType: DataType => Boolean,
+    info: LogicalWriteInfo
+) extends GarWriteBuilder(paths, formatName, supportsDataType, info) {
+  override def prepareWrite(
+      sqlConf: SQLConf,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType
+  ): OutputWriterFactory = {
+    val conf = job.getConfiguration
+    val parsedOptions = new JSONOptions(
+      options,
+      sqlConf.sessionLocalTimeZone,
+      sqlConf.columnNameOfCorruptRecord
+    )
+    parsedOptions.compressionCodec.foreach { codec =>
+      CompressionCodecs.setCodecConfiguration(conf, codec)
+    }
+
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String,
+          dataSchema: StructType,
+          context: TaskAttemptContext
+      ): OutputWriter = {
+        new JsonOutputWriter(path, parsedOptions, dataSchema, context)
+      }
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        ".json" + CodecStreams.getCompressionExtension(context)
+      }
+    }
+  }
+}
diff --git 
a/maven-projects/spark/graphar/src/main/scala/org/apache/graphar/GraphInfo.scala
 
b/maven-projects/spark/graphar/src/main/scala/org/apache/graphar/GraphInfo.scala
index 36804617..5c410d65 100644
--- 
a/maven-projects/spark/graphar/src/main/scala/org/apache/graphar/GraphInfo.scala
+++ 
b/maven-projects/spark/graphar/src/main/scala/org/apache/graphar/GraphInfo.scala
@@ -96,6 +96,7 @@ object FileType extends Enumeration {
   val CSV = Value(0)
   val PARQUET = Value(1)
   val ORC = Value(2)
+  val JSON = Value(3)
 
   /**
    * File type to string.
@@ -109,6 +110,7 @@ object FileType extends Enumeration {
     case FileType.CSV     => "csv"
     case FileType.PARQUET => "parquet"
     case FileType.ORC     => "orc"
+    case FileType.JSON    => "json"
     case _ => throw new IllegalArgumentException("Unknown file type")
   }
 
@@ -124,6 +126,7 @@ object FileType extends Enumeration {
     case "csv"     => FileType.CSV
     case "parquet" => FileType.PARQUET
     case "orc"     => FileType.ORC
+    case "json"    => FileType.JSON
     case _ => throw new IllegalArgumentException("Unknown file type: " + str)
   }
 
diff --git 
a/maven-projects/spark/graphar/src/test/scala/org/apache/graphar/TestReader.scala
 
b/maven-projects/spark/graphar/src/test/scala/org/apache/graphar/TestReader.scala
index 2aafb29d..f61710b9 100644
--- 
a/maven-projects/spark/graphar/src/test/scala/org/apache/graphar/TestReader.scala
+++ 
b/maven-projects/spark/graphar/src/test/scala/org/apache/graphar/TestReader.scala
@@ -191,6 +191,25 @@ class ReaderSuite extends BaseTestSuite {
     )
   }
 
+  test("json test") {
+    // construct the vertex information
+    val prefix = testData + "/ldbc_sample/json/"
+    val vertex_yaml = prefix + "Person.vertex.yml"
+    val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark)
+
+    // construct the vertex reader
+    val reader = new VertexReader(prefix, vertex_info, spark)
+
+    // test reading the number of vertices
+    assert(reader.readVerticesNumber() == 903)
+    val property_group = vertex_info.getPropertyGroup("gender")
+
+    // test reading a single property chunk
+    val single_chunk_df = reader.readVertexPropertyChunk(property_group, 0)
+    assert(single_chunk_df.columns.length == 5)
+    assert(single_chunk_df.count() == 100)
+  }
+
   test("read edge chunks") {
     // construct the edge information
     val prefix = testData + "/ldbc_sample/csv/"
diff --git a/pyspark/graphar_pyspark/enums.py b/pyspark/graphar_pyspark/enums.py
index 1e629026..54094d00 100644
--- a/pyspark/graphar_pyspark/enums.py
+++ b/pyspark/graphar_pyspark/enums.py
@@ -58,6 +58,7 @@ class FileType(Enum):
     """Type of  file format."""
 
     CSV = "csv"
+    JSON = "json"
     PARQUET = "parquet"
     ORC = "orc"
 
diff --git a/pyspark/graphar_pyspark/graph.py b/pyspark/graphar_pyspark/graph.py
index 1c8f8043..eeab184c 100644
--- a/pyspark/graphar_pyspark/graph.py
+++ b/pyspark/graphar_pyspark/graph.py
@@ -195,7 +195,7 @@ class GraphWriter:
         :param name: the name of graph, default is 'grpah'
         :param vertex_chunk_size: the chunk size for vertices, default is 2^18
         :param edge_chunk_size: the chunk size for edges, default is 2^22
-        :param file_type: the file type for data payload file, support 
[parquet, orc, csv], default is parquet.
+        :param file_type: the file type for data payload file, support 
[parquet, orc, csv, json], default is parquet.
         :param version: version of GraphAr format, default is v1.
         """
         if vertex_chunk_size is None:
diff --git a/pyspark/tests/test_reader.py b/pyspark/tests/test_reader.py
index a92a399f..9736c508 100644
--- a/pyspark/tests/test_reader.py
+++ b/pyspark/tests/test_reader.py
@@ -100,6 +100,45 @@ def test_edge_reader(spark):
     assert edge_reader.read_edges_number(0) == 0
     assert edge_reader.read_offset(0).count() > 0
 
+def test_vertex_reader_with_json(spark):
+    initialize(spark)
+
+    vertex_info = VertexInfo.load_vertex_info(
+        GRAPHAR_TESTS_EXAMPLES.joinpath("ldbc_sample/json")
+        .joinpath("Person.vertex.yml")
+        .absolute()
+        .__str__()
+    )
+    vertex_reader = VertexReader.from_python(
+        
GRAPHAR_TESTS_EXAMPLES.joinpath("ldbc_sample/json/").absolute().__str__(),
+        vertex_info,
+    )
+    assert VertexReader.from_scala(vertex_reader.to_scala()) is not None
+    assert vertex_reader.read_vertices_number() > 0
+    assert (
+        vertex_reader.read_vertex_property_group(
+            vertex_info.get_property_group("firstName")
+        ).count()
+        > 0
+    )
+    assert (
+        vertex_reader.read_vertex_property_chunk(
+            vertex_info.get_property_groups()[0], 0
+        ).count()
+        > 0
+    )
+    assert (
+        vertex_reader.read_all_vertex_property_groups().count()
+        >= vertex_reader.read_vertex_property_group(
+            vertex_info.get_property_group("lastName")
+        ).count()
+    )
+    assert (
+        vertex_reader.read_multiple_vertex_property_groups(
+            [vertex_info.get_property_group("gender")]
+        ).count()
+        > 0
+    )
 
 def test_graph_reader(spark):
     initialize(spark)
diff --git a/testing b/testing
index 5066ecba..5419fc79 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 5066ecba2ae20e4540d4199679da7957f85adf4c
+Subproject commit 5419fc79c43c7dc37d2b3288da6ea319b44a7042


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to