amygbAI commented on PR #488:
URL:
https://github.com/apache/incubator-graphar/pull/488#issuecomment-2147591419
hi @acezen this is proving to be a lot more tricky than just copying the
implementation locally and making it public ..here's what i have tried so far (
in both spark 3.2 and 3.3 ..same issues )
```
package org.apache.graphar.datasources.json
import java.nio.charset.{Charset, StandardCharsets}
import java.time.ZoneId
import java.util.Locale
import com.fasterxml.jackson.core.{JsonFactory, JsonFactoryBuilder}
import com.fasterxml.jackson.core.json.JsonReadFeature
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
/**
* Options for parsing JSON data into Spark SQL rows.
*
* Most of these map directly to Jackson's internal options, specified in
[[JsonReadFeature]].
*/
class JSONOptions(
@transient val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
extends Logging with Serializable {
def this(
parameters: Map[String, String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String = "") = {
this(
CaseInsensitiveMap(parameters),
defaultTimeZoneId,
defaultColumnNameOfCorruptRecord)
}
val samplingRatio =
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
val primitivesAsString =
parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
val prefersDecimal =
parameters.get("prefersDecimal").map(_.toBoolean).getOrElse(false)
val allowComments =
parameters.get("allowComments").map(_.toBoolean).getOrElse(false)
val allowUnquotedFieldNames =
parameters.get("allowUnquotedFieldNames").map(_.toBoolean).getOrElse(false)
val allowSingleQuotes =
parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(true)
val allowNumericLeadingZeros =
parameters.get("allowNumericLeadingZeros").map(_.toBoolean).getOrElse(false)
val allowNonNumericNumbers =
parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
private val allowUnquotedControlChars =
parameters.get("allowUnquotedControlChars").map(_.toBoolean).getOrElse(false)
val compressionCodec =
parameters.get("compression").map(CompressionCodecs.getCodecClassName)
val parseMode: ParseMode =
val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord",
defaultColumnNameOfCorruptRecord)
// Whether to ignore column of all null values or empty array/struct
during schema inference
val dropFieldIfAllNull =
parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false)
// Whether to ignore null fields during json generating
val ignoreNullFields = parameters.get("ignoreNullFields").map(_.toBoolean)
.getOrElse(SQLConf.get.jsonGeneratorIgnoreNullFields)
// A language tag in IETF BCP 47 format
val locale: Locale =
parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)
val zoneId: ZoneId = DateTimeUtils.getZoneId(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
val dateFormat: String = parameters.getOrElse("dateFormat",
DateFormatter.defaultPattern)
val timestampFormat: String = parameters.getOrElse("timestampFormat",
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})
val multiLine =
parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
sep
}
protected def checkedEncoding(enc: String): String = enc
/**
* Standard encoding (charset) name. For example UTF-8, UTF-16LE and
UTF-32BE.
* If the encoding is not specified (None) in read, it will be detected
automatically
* when the multiLine option is set to `true`. If encoding is not
specified in write,
* UTF-8 is used by default.
*/
val encoding: Option[String] = parameters.get("encoding")
.orElse(parameters.get("charset")).map(checkedEncoding)
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep
=>
lineSep.getBytes(encoding.getOrElse(StandardCharsets.UTF_8.name()))
}
val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n")
/**
* Generating JSON strings in pretty representation if the parameter is
enabled.
*/
val pretty: Boolean =
parameters.get("pretty").map(_.toBoolean).getOrElse(false)
val inferTimestamp: Boolean =
parameters.get("inferTimestamp").map(_.toBoolean).getOrElse(false)
/**
* Generating \u0000 style codepoints for non-ASCII characters if the
parameter is enabled.
*/
val writeNonAsciiCharacterAsCodePoint: Boolean =
parameters.get("writeNonAsciiCharacterAsCodePoint").map(_.toBoolean).getOrElse(false)
/** Build a Jackson [[JsonFactory]] using JSON options. */
def buildJsonFactory(): JsonFactory = {
new JsonFactoryBuilder()
.configure(JsonReadFeature.ALLOW_JAVA_COMMENTS, allowComments)
.configure(JsonReadFeature.ALLOW_UNQUOTED_FIELD_NAMES,
allowUnquotedFieldNames)
.configure(JsonReadFeature.ALLOW_SINGLE_QUOTES, allowSingleQuotes)
.configure(JsonReadFeature.ALLOW_LEADING_ZEROS_FOR_NUMBERS,
allowNumericLeadingZeros)
.configure(JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS,
allowNonNumericNumbers)
.configure(
JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER,
allowBackslashEscapingAnyCharacter)
.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS,
allowUnquotedControlChars)
.build()
}
}
class JSONOptionsInRead(
@transient override val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
extends JSONOptions(parameters, defaultTimeZoneId,
defaultColumnNameOfCorruptRecord) {
def this(
parameters: Map[String, String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String = "") = {
this(
CaseInsensitiveMap(parameters),
defaultTimeZoneId,
defaultColumnNameOfCorruptRecord)
}
protected override def checkedEncoding(enc: String): String = {
val isDenied = JSONOptionsInRead.denyList.contains(Charset.forName(enc))
require(multiLine || !isDenied,
s"""The $enc encoding must not be included in the denyList when
multiLine is disabled:
|denylist: ${JSONOptionsInRead.denyList.mkString(",
")}""".stripMargin)
val isLineSepRequired =
multiLine || Charset.forName(enc) == StandardCharsets.UTF_8 ||
lineSeparator.nonEmpty
require(isLineSepRequired, s"The lineSep option must be specified for
the $enc encoding")
enc
}
}
object JSONOptionsInRead {
// The following encodings are not supported in per-line mode (multiline
is false)
// because they cause some problems in reading files with BOM which is
supposed to
// present in the files with such encodings. After splitting input files
by lines,
// only the first lines will have the BOM which leads to impossibility for
reading
// the rest lines. Besides of that, the lineSep option must have the BOM
in such
// encodings which can never present between lines.
val denyList = Seq(
Charset.forName("UTF-16"),
Charset.forName("UTF-32")
)
}
```
so basically wherever private was used i removed it in the local directory
BUT then i get the error in the following code in the JSONWriterBuilder.scala
file i had written ( similar to CSVWriterBuilder.scala )
```
new OutputWriterFactory {
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext
): OutputWriter = {
new JsonOutputWriter(path, parsedOptions, dataSchema, context)
}
override def getFileExtension(context: TaskAttemptContext): String = {
".json" + CodecStreams.getCompressionExtension(context)
}
}
```
_[ERROR] [Error]
/datadrive/GRAPH_AR/act_git_clone/incubator-graphar/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/json/JSONWriterBuilder.scala:64:
type mismatch;
found : JSONOptions (in org.apache.graphar.datasources.json)
required: JSONOptions (in org.apache.spark.sql.catalyst.json)_
so i tried overriding the JsonOutputWriter with a custom JsonOutputWriter
```
package org.apache.graphar.datasources.json
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.JacksonGenerator
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types.StructType
class CustomJsonOutputWriter(
path: String,
options: JSONOptions,
dataSchema: StructType,
context: TaskAttemptContext
) extends OutputWriter {
private val generator = new JacksonGenerator(dataSchema, options)
override def write(row: InternalRow): Unit = {
generator.write(row)
}
override def close(): Unit = {
generator.close()
}
}
```
**but this has tons of issues with the JacksonGenerator being private**
it would be nice to see what the community feels about this OR do we just
have a read only for JSON till we support spark 3.5 ??
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]