This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch scala3 in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
commit 19c34a2302f92748bb1156e7f85d8a0339ab35fa Author: PJ Fanning <[email protected]> AuthorDate: Thu Jun 8 22:00:41 2023 +0100 Scala3 support for more google connectors (#143) * support scala3 (google-common) * support scala3 on more google connectors * continue * imports * more implicit issues * Update BigQueryCollectionFormats.scala * further issues * class access issues * Update BigQueryRestBasicFormats.scala * use compactPrint because toString causes tests to fail in Scala3 * review issue --- .../elasticsearch/scaladsl/ElasticsearchFlow.scala | 2 +- .../scala/docs/scaladsl/ElasticsearchV5Spec.scala | 6 ++--- .../scala/docs/scaladsl/ElasticsearchV7Spec.scala | 6 ++--- .../scala/docs/scaladsl/OpensearchV1Spec.scala | 6 ++--- .../bigquery/model/DatasetJsonProtocol.scala | 6 ++--- .../bigquery/model/ErrorProtoJsonProtocol.scala | 3 ++- .../bigquery/model/JobJsonProtocol.scala | 21 +++++++++------- .../bigquery/model/QueryJsonProtocol.scala | 9 ++++--- .../bigquery/model/TableDataJsonProtocol.scala | 13 +++++----- .../bigquery/model/TableJsonProtocol.scala | 15 ++++++------ .../scaladsl/spray/BigQueryCollectionFormats.scala | 26 +++++++++++++------- .../scaladsl/spray/BigQueryRestBasicFormats.scala | 24 +++++++++---------- .../src/test/scala/docs/scaladsl/BigQueryDoc.scala | 12 ++++++---- .../connectors/googlecloud/bigquery/e2e/A.scala | 4 ++-- .../e2e/scaladsl/BigQueryEndToEndSpec.scala | 18 +++++++------- .../bigquery/scaladsl/BigQueryQueriesSpec.scala | 12 +++++----- .../scaladsl/schema/BigQuerySchemasSpec.scala | 10 ++++---- .../scaladsl/spray/BigQueryJsonProtocolSpec.scala | 4 ++-- .../googlecloud/pubsub/impl/PubSubApi.scala | 28 ++++++++++++---------- .../googlecloud/pubsub/impl/PubSubApiSpec.scala | 2 +- .../stream/connectors/google/GoogleSettings.scala | 6 +++-- .../stream/connectors/google/ResumableUpload.scala | 1 - .../connectors/google/auth/NoCredentials.scala | 2 +- project/Dependencies.scala | 2 -- 24 files changed, 128 insertions(+), 110 deletions(-) diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala index 463848cb2..3c80a959e 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala @@ -221,7 +221,7 @@ object ElasticsearchFlow { } private final class SprayJsonWriter[T](implicit writer: JsonWriter[T]) extends MessageWriter[T] { - override def convert(message: T): String = message.toJson.toString() + override def convert(message: T): String = message.toJson.compactPrint } } diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala index 52b27dc11..7bc0bb25b 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala @@ -158,9 +158,9 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt // #string val write: Future[immutable.Seq[WriteResult[String, NotUsed]]] = Source( immutable.Seq( - WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.toString()), - WriteMessage.createIndexMessage("2", Book("Faust").toJson.toString()), - WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.toString()))).via( + WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.compactPrint), + WriteMessage.createIndexMessage("2", Book("Faust").toJson.compactPrint), + WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.compactPrint))).via( ElasticsearchFlow.create( constructElasticsearchParams(indexName, "_doc", ApiVersion.V5), settings = baseWriteSettings, diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV7Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV7Spec.scala index 1d2522454..bccd5afeb 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV7Spec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV7Spec.scala @@ -147,9 +147,9 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt val write: Future[immutable.Seq[WriteResult[String, NotUsed]]] = Source( immutable.Seq( - WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.toString()), - WriteMessage.createIndexMessage("2", Book("Faust").toJson.toString()), - WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.toString()))).via( + WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.compactPrint), + WriteMessage.createIndexMessage("2", Book("Faust").toJson.compactPrint), + WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.compactPrint))).via( ElasticsearchFlow.create( constructElasticsearchParams(indexName, "_doc", ApiVersion.V7), settings = baseWriteSettings, diff --git a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala index 3ba59ee55..81a093399 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala @@ -163,9 +163,9 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils // #string val write: Future[immutable.Seq[WriteResult[String, NotUsed]]] = Source( immutable.Seq( - WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.toString()), - WriteMessage.createIndexMessage("2", Book("Faust").toJson.toString()), - WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.toString()))).via( + WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.compactPrint), + WriteMessage.createIndexMessage("2", Book("Faust").toJson.compactPrint), + WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.compactPrint))).via( ElasticsearchFlow.create( constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), settings = baseWriteSettings, diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/DatasetJsonProtocol.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/DatasetJsonProtocol.scala index 3e0166776..ef9ff12ca 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/DatasetJsonProtocol.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/DatasetJsonProtocol.scala @@ -32,7 +32,7 @@ import scala.collection.immutable.Seq * @param labels the labels associated with this dataset * @param location the geographic location where the dataset should reside */ -final case class Dataset private (datasetReference: DatasetReference, +final case class Dataset private[bigquery] (datasetReference: DatasetReference, friendlyName: Option[String], labels: Option[Map[String, String]], location: Option[String]) { @@ -87,7 +87,7 @@ object Dataset { * @param datasetId A unique ID for this dataset, without the project name * @param projectId The ID of the project containing this dataset */ -final case class DatasetReference private (datasetId: Option[String], projectId: Option[String]) { +final case class DatasetReference private[bigquery] (datasetId: Option[String], projectId: Option[String]) { def getDatasetId = datasetId.toJava def getProjectId = projectId.toJava @@ -126,7 +126,7 @@ object DatasetReference { * @param nextPageToken a token that can be used to request the next results page * @param datasets an array of the dataset resources in the project */ -final case class DatasetListResponse private (nextPageToken: Option[String], datasets: Option[Seq[Dataset]]) { +final case class DatasetListResponse private[bigquery] (nextPageToken: Option[String], datasets: Option[Seq[Dataset]]) { def getNextPageToken = nextPageToken.toJava def getDatasets = datasets.map(_.asJava).toJava diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/ErrorProtoJsonProtocol.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/ErrorProtoJsonProtocol.scala index 8aec41cdf..a454b0ae8 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/ErrorProtoJsonProtocol.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/ErrorProtoJsonProtocol.scala @@ -31,7 +31,8 @@ import scala.annotation.nowarn * @param location specifies where the error occurred, if present * @param message A human-readable description of the error */ -final case class ErrorProto private (reason: Option[String], location: Option[String], message: Option[String]) { +final case class ErrorProto private[bigquery] (reason: Option[String], location: Option[String], + message: Option[String]) { @nowarn("msg=never used") @JsonCreator diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/JobJsonProtocol.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/JobJsonProtocol.scala index c53722c02..bdd04a5ab 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/JobJsonProtocol.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/JobJsonProtocol.scala @@ -33,7 +33,7 @@ import scala.collection.immutable.Seq * @param jobReference reference describing the unique-per-user name of the job * @param status the status of this job */ -final case class Job private (configuration: Option[JobConfiguration], +final case class Job private[bigquery] (configuration: Option[JobConfiguration], jobReference: Option[JobReference], status: Option[JobStatus]) { @@ -83,7 +83,8 @@ object Job { * @param load configures a load job * @param labels the labels associated with this job */ -final case class JobConfiguration private (load: Option[JobConfigurationLoad], labels: Option[Map[String, String]]) { +final case class JobConfiguration private[bigquery] (load: Option[JobConfigurationLoad], + labels: Option[Map[String, String]]) { def getLoad = load.toJava def getLabels = labels.toJava @@ -144,7 +145,7 @@ object JobConfiguration { * @param writeDisposition specifies the action that occurs if the destination table already exists * @param sourceFormat the format of the data files */ -final case class JobConfigurationLoad private (schema: Option[TableSchema], +final case class JobConfigurationLoad private[bigquery] (schema: Option[TableSchema], destinationTable: Option[TableReference], createDisposition: Option[CreateDisposition], writeDisposition: Option[WriteDisposition], @@ -210,7 +211,7 @@ object JobConfigurationLoad { implicit val configurationLoadFormat: JsonFormat[JobConfigurationLoad] = jsonFormat5(apply) } -final case class CreateDisposition private (value: String) extends StringEnum +final case class CreateDisposition private[bigquery] (value: String) extends StringEnum object CreateDisposition { /** @@ -227,7 +228,7 @@ object CreateDisposition { implicit val format: JsonFormat[CreateDisposition] = StringEnum.jsonFormat(apply) } -final case class WriteDisposition private (value: String) extends StringEnum +final case class WriteDisposition private[bigquery] (value: String) extends StringEnum object WriteDisposition { /** @@ -269,7 +270,8 @@ object SourceFormat { * @param jobId the ID of the job * @param location the geographic location of the job */ -final case class JobReference private (projectId: Option[String], jobId: Option[String], location: Option[String]) { +final case class JobReference private[bigquery] (projectId: Option[String], jobId: Option[String], + location: Option[String]) { @nowarn("msg=never used") @JsonCreator @@ -323,7 +325,8 @@ object JobReference { * @param errors the first errors encountered during the running of the job * @param state running state of the job */ -final case class JobStatus private (errorResult: Option[ErrorProto], errors: Option[Seq[ErrorProto]], state: JobState) { +final case class JobStatus private[bigquery] (errorResult: Option[ErrorProto], errors: Option[Seq[ErrorProto]], + state: JobState) { def getErrorResult = errorResult.toJava def getErrors = errors.map(_.asJava).toJava @@ -360,7 +363,7 @@ object JobStatus { implicit val format: JsonFormat[JobStatus] = jsonFormat3(apply) } -final case class JobState private (value: String) extends StringEnum +final case class JobState private[bigquery] (value: String) extends StringEnum object JobState { /** @@ -380,7 +383,7 @@ object JobState { implicit val format: JsonFormat[JobState] = StringEnum.jsonFormat(apply) } -final case class JobCancelResponse private (job: Job) { +final case class JobCancelResponse private[bigquery] (job: Job) { def getJob = job def withJob(job: Job) = copy(job = job) diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/QueryJsonProtocol.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/QueryJsonProtocol.scala index 56ce06f02..7149d8225 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/QueryJsonProtocol.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/QueryJsonProtocol.scala @@ -21,11 +21,10 @@ import pekko.util.ccompat.JavaConverters._ import pekko.util.JavaDurationConverters._ import pekko.util.OptionConverters._ import com.fasterxml.jackson.annotation.{ JsonCreator, JsonIgnoreProperties, JsonProperty } -import spray.json.{ RootJsonFormat, RootJsonReader } +import spray.json.{ JsonFormat, RootJsonFormat, RootJsonReader } import java.time.Duration import java.{ lang, util } - import scala.annotation.nowarn import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable.Seq @@ -46,7 +45,7 @@ import scala.concurrent.duration.FiniteDuration * @param maximumBytesBilled limits the number of bytes billed for this query * @param requestId a unique user provided identifier to ensure idempotent behavior for queries */ -final case class QueryRequest private (query: String, +final case class QueryRequest private[bigquery] (query: String, maxResults: Option[Int], defaultDataset: Option[DatasetReference], timeout: Option[FiniteDuration], @@ -192,7 +191,7 @@ object QueryRequest { * @tparam T the data model for each row */ @JsonIgnoreProperties(ignoreUnknown = true) -final case class QueryResponse[+T] private (schema: Option[TableSchema], +final case class QueryResponse[+T] private[bigquery] (schema: Option[TableSchema], jobReference: JobReference, totalRows: Option[Long], pageToken: Option[String], @@ -329,7 +328,7 @@ object QueryResponse { implicit def reader[T <: AnyRef]( implicit reader: BigQueryRootJsonReader[T]): RootJsonReader[QueryResponse[T]] = { - implicit val format = lift(reader) + implicit val format: JsonFormat[T] = lift(reader) jsonFormat10(QueryResponse[T]) } implicit val paginated: Paginated[QueryResponse[Any]] = _.pageToken diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/TableDataJsonProtocol.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/TableDataJsonProtocol.scala index fa0451534..1ca69cc74 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/TableDataJsonProtocol.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/TableDataJsonProtocol.scala @@ -38,7 +38,8 @@ import scala.collection.immutable.Seq * @tparam T the data model of each row */ @JsonIgnoreProperties(ignoreUnknown = true) -final case class TableDataListResponse[+T] private (totalRows: Long, pageToken: Option[String], rows: Option[Seq[T]]) { +final case class TableDataListResponse[+T] private[bigquery] (totalRows: Long, pageToken: Option[String], + rows: Option[Seq[T]]) { @nowarn("msg=never used") @JsonCreator @@ -82,7 +83,7 @@ object TableDataListResponse { implicit def reader[T <: AnyRef]( implicit reader: BigQueryRootJsonReader[T]): RootJsonReader[TableDataListResponse[T]] = { - implicit val format = lift(reader) + implicit val format: JsonFormat[T] = lift(reader) jsonFormat3(TableDataListResponse[T]) } implicit val paginated: Paginated[TableDataListResponse[Any]] = _.pageToken @@ -99,7 +100,7 @@ object TableDataListResponse { * @tparam T the data model of each row */ @JsonInclude(Include.NON_NULL) -final case class TableDataInsertAllRequest[+T] private (skipInvalidRows: Option[Boolean], +final case class TableDataInsertAllRequest[+T] private[bigquery] (skipInvalidRows: Option[Boolean], ignoreUnknownValues: Option[Boolean], templateSuffix: Option[String], rows: Seq[Row[T]]) { @@ -179,7 +180,7 @@ object TableDataInsertAllRequest { * @param json the record this row contains * @tparam T the data model of the record */ -final case class Row[+T] private (insertId: Option[String], json: T) { +final case class Row[+T] private[bigquery] (insertId: Option[String], json: T) { def getInsertId = insertId.toJava def getJson = json @@ -212,7 +213,7 @@ object Row { * TableDataInsertAllResponse model * @see [[https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll#response-body BigQuery reference]] */ -final case class TableDataInsertAllResponse private (insertErrors: Option[Seq[InsertError]]) { +final case class TableDataInsertAllResponse private[bigquery] (insertErrors: Option[Seq[InsertError]]) { def getInsertErrors = insertErrors.map(_.asJava).toJava def withInsertErrors(insertErrors: Option[Seq[InsertError]]) = @@ -239,7 +240,7 @@ object TableDataInsertAllResponse { * InsertError model * @see [[https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll#response-body BigQuery reference]] */ -final case class InsertError private (index: Int, errors: Option[Seq[ErrorProto]]) { +final case class InsertError private[bigquery] (index: Int, errors: Option[Seq[ErrorProto]]) { def getIndex = index def getErrors = errors.map(_.asJava).toJava diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/TableJsonProtocol.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/TableJsonProtocol.scala index d193fb4d7..60bdae7f3 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/TableJsonProtocol.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/model/TableJsonProtocol.scala @@ -37,7 +37,7 @@ import scala.collection.immutable.Seq * @param numRows the number of rows of data in this table * @param location the geographic location where the table resides */ -final case class Table private (tableReference: TableReference, +final case class Table private[bigquery] (tableReference: TableReference, labels: Option[Map[String, String]], schema: Option[TableSchema], numRows: Option[Long], @@ -109,7 +109,8 @@ object Table { * @param datasetId the ID of the dataset containing this table * @param tableId the ID of the table */ -final case class TableReference private (projectId: Option[String], datasetId: String, tableId: Option[String]) { +final case class TableReference private[bigquery] (projectId: Option[String], datasetId: String, + tableId: Option[String]) { def getProjectId = projectId.toJava def getDatasetId = datasetId @@ -152,7 +153,7 @@ object TableReference { * * @param fields describes the fields in a table */ -final case class TableSchema private (fields: Seq[TableFieldSchema]) { +final case class TableSchema private[bigquery] (fields: Seq[TableFieldSchema]) { @nowarn("msg=never used") @JsonCreator @@ -200,7 +201,7 @@ object TableSchema { * @param mode the field mode * @param fields describes the nested schema fields if the type property is set to `RECORD` */ -final case class TableFieldSchema private (name: String, +final case class TableFieldSchema private[bigquery] (name: String, `type`: TableFieldSchemaType, mode: Option[TableFieldSchemaMode], fields: Option[Seq[TableFieldSchema]]) { @@ -278,7 +279,7 @@ object TableFieldSchema { jsonFormat(apply, "name", "type", "mode", "fields")) } -final case class TableFieldSchemaType private (value: String) extends StringEnum +final case class TableFieldSchemaType private[bigquery] (value: String) extends StringEnum object TableFieldSchemaType { /** @@ -328,7 +329,7 @@ object TableFieldSchemaType { implicit val format: JsonFormat[TableFieldSchemaType] = StringEnum.jsonFormat(apply) } -final case class TableFieldSchemaMode private (value: String) extends StringEnum +final case class TableFieldSchemaMode private[bigquery] (value: String) extends StringEnum object TableFieldSchemaMode { /** @@ -356,7 +357,7 @@ object TableFieldSchemaMode { * @param tables tables in the requested dataset * @param totalItems the total number of tables in the dataset */ -final case class TableListResponse private (nextPageToken: Option[String], +final case class TableListResponse private[bigquery] (nextPageToken: Option[String], tables: Option[Seq[Table]], totalItems: Option[Int]) { diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/spray/BigQueryCollectionFormats.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/spray/BigQueryCollectionFormats.scala index 99552c19b..847498fef 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/spray/BigQueryCollectionFormats.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/spray/BigQueryCollectionFormats.scala @@ -44,18 +44,26 @@ trait BigQueryCollectionFormats { import collection.{ immutable => imm } - implicit def immIterableFormat[T: BigQueryJsonFormat] = viaSeq[imm.Iterable[T], T](seq => imm.Iterable(seq: _*)) - implicit def immSeqFormat[T: BigQueryJsonFormat] = viaSeq[imm.Seq[T], T](seq => imm.Seq(seq: _*)) - implicit def immIndexedSeqFormat[T: BigQueryJsonFormat] = viaSeq[imm.IndexedSeq[T], T](seq => imm.IndexedSeq(seq: _*)) - implicit def immLinearSeqFormat[T: BigQueryJsonFormat] = viaSeq[imm.LinearSeq[T], T](seq => imm.LinearSeq(seq: _*)) - implicit def vectorFormat[T: BigQueryJsonFormat] = viaSeq[Vector[T], T](seq => Vector(seq: _*)) + implicit def immIterableFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[imm.Iterable[T]] = + viaSeq[imm.Iterable[T], T](seq => imm.Iterable(seq: _*)) + implicit def immSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[imm.Seq[T]] = + viaSeq[imm.Seq[T], T](seq => imm.Seq(seq: _*)) + implicit def immIndexedSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[imm.IndexedSeq[T]] = + viaSeq[imm.IndexedSeq[T], T](seq => imm.IndexedSeq(seq: _*)) + implicit def immLinearSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[imm.LinearSeq[T]] = + viaSeq[imm.LinearSeq[T], T](seq => imm.LinearSeq(seq: _*)) + implicit def vectorFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[Vector[T]] = + viaSeq[Vector[T], T](seq => Vector(seq: _*)) import collection._ - implicit def iterableFormat[T: BigQueryJsonFormat] = viaSeq[Iterable[T], T](seq => Iterable(seq: _*)) - implicit def seqFormat[T: BigQueryJsonFormat] = viaSeq[Seq[T], T](seq => Seq(seq: _*)) - implicit def indexedSeqFormat[T: BigQueryJsonFormat] = viaSeq[IndexedSeq[T], T](seq => IndexedSeq(seq: _*)) - implicit def linearSeqFormat[T: BigQueryJsonFormat] = viaSeq[LinearSeq[T], T](seq => LinearSeq(seq: _*)) + implicit def iterableFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[Iterable[T]] = + viaSeq[Iterable[T], T](seq => Iterable(seq: _*)) + implicit def seqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[Seq[T]] = viaSeq[Seq[T], T](seq => Seq(seq: _*)) + implicit def indexedSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[IndexedSeq[T]] = + viaSeq[IndexedSeq[T], T](seq => IndexedSeq(seq: _*)) + implicit def linearSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[collection.LinearSeq[T]] = + viaSeq[collection.LinearSeq[T], T](seq => collection.LinearSeq(seq: _*)) /** * A BigQueryJsonFormat construction helper that creates a BigQueryJsonFormat for an Iterable type I from a builder function diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/spray/BigQueryRestBasicFormats.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/spray/BigQueryRestBasicFormats.scala index 3dcd414e0..8c84f2997 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/spray/BigQueryRestBasicFormats.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/spray/BigQueryRestBasicFormats.scala @@ -22,18 +22,18 @@ import scala.concurrent.duration.{ DurationLong, FiniteDuration } */ trait BigQueryRestBasicFormats { - implicit val IntJsonFormat = DefaultJsonProtocol.IntJsonFormat - implicit val FloatJsonFormat = DefaultJsonProtocol.FloatJsonFormat - implicit val DoubleJsonFormat = DefaultJsonProtocol.DoubleJsonFormat - implicit val ByteJsonFormat = DefaultJsonProtocol.ByteJsonFormat - implicit val ShortJsonFormat = DefaultJsonProtocol.ShortJsonFormat - implicit val BigDecimalJsonFormat = DefaultJsonProtocol.BigDecimalJsonFormat - implicit val BigIntJsonFormat = DefaultJsonProtocol.BigIntJsonFormat - implicit val UnitJsonFormat = DefaultJsonProtocol.UnitJsonFormat - implicit val BooleanJsonFormat = DefaultJsonProtocol.BooleanJsonFormat - implicit val CharJsonFormat = DefaultJsonProtocol.CharJsonFormat - implicit val StringJsonFormat = DefaultJsonProtocol.StringJsonFormat - implicit val SymbolJsonFormat = DefaultJsonProtocol.SymbolJsonFormat + implicit val IntJsonFormat: JsonFormat[Int] = DefaultJsonProtocol.IntJsonFormat + implicit val FloatJsonFormat: JsonFormat[Float] = DefaultJsonProtocol.FloatJsonFormat + implicit val DoubleJsonFormat: JsonFormat[Double] = DefaultJsonProtocol.DoubleJsonFormat + implicit val ByteJsonFormat: JsonFormat[Byte] = DefaultJsonProtocol.ByteJsonFormat + implicit val ShortJsonFormat: JsonFormat[Short] = DefaultJsonProtocol.ShortJsonFormat + implicit val BigDecimalJsonFormat: JsonFormat[BigDecimal] = DefaultJsonProtocol.BigDecimalJsonFormat + implicit val BigIntJsonFormat: JsonFormat[BigInt] = DefaultJsonProtocol.BigIntJsonFormat + implicit val UnitJsonFormat: JsonFormat[Unit] = DefaultJsonProtocol.UnitJsonFormat + implicit val BooleanJsonFormat: JsonFormat[Boolean] = DefaultJsonProtocol.BooleanJsonFormat + implicit val CharJsonFormat: JsonFormat[Char] = DefaultJsonProtocol.CharJsonFormat + implicit val StringJsonFormat: JsonFormat[String] = DefaultJsonProtocol.StringJsonFormat + implicit val SymbolJsonFormat: JsonFormat[Symbol] = DefaultJsonProtocol.SymbolJsonFormat implicit object BigQueryLongJsonFormat extends JsonFormat[Long] { def write(x: Long) = JsNumber(x) diff --git a/google-cloud-bigquery/src/test/scala/docs/scaladsl/BigQueryDoc.scala b/google-cloud-bigquery/src/test/scala/docs/scaladsl/BigQueryDoc.scala index 4875416f0..2c71e35e2 100644 --- a/google-cloud-bigquery/src/test/scala/docs/scaladsl/BigQueryDoc.scala +++ b/google-cloud-bigquery/src/test/scala/docs/scaladsl/BigQueryDoc.scala @@ -28,9 +28,11 @@ import pekko.stream.connectors.googlecloud.bigquery.model.{ TableDataListResponse, TableListResponse } -import pekko.stream.connectors.googlecloud.bigquery.scaladsl.BigQuery import pekko.stream.connectors.googlecloud.bigquery.scaladsl.schema.BigQuerySchemas._ +import pekko.stream.connectors.googlecloud.bigquery.scaladsl.schema.TableSchemaWriter +import pekko.stream.connectors.googlecloud.bigquery.scaladsl.spray.BigQueryRootJsonFormat import pekko.stream.connectors.googlecloud.bigquery.scaladsl.spray.BigQueryJsonProtocol._ +import pekko.stream.connectors.googlecloud.bigquery.scaladsl.BigQuery import pekko.stream.scaladsl.{ Flow, Sink, Source } import pekko.{ Done, NotUsed } @@ -48,8 +50,8 @@ class BigQueryDoc { // #setup case class Person(name: String, age: Int, addresses: Seq[Address], isHakker: Boolean) case class Address(street: String, city: String, postalCode: Option[Int]) - implicit val addressFormat = bigQueryJsonFormat3(Address) - implicit val personFormat = bigQueryJsonFormat4(Person) + implicit val addressFormat: BigQueryRootJsonFormat[Address] = bigQueryJsonFormat3(Address.apply) + implicit val personFormat: BigQueryRootJsonFormat[Person] = bigQueryJsonFormat4(Person.apply) // #setup @nowarn("msg=dead code") @@ -113,8 +115,8 @@ class BigQueryDoc { // #table-methods // #create-table - implicit val addressSchema = bigQuerySchema3(Address) - implicit val personSchema = bigQuerySchema4(Person) + implicit val addressSchema: TableSchemaWriter[Address] = bigQuerySchema3(Address.apply) + implicit val personSchema: TableSchemaWriter[Person] = bigQuerySchema4(Person.apply) val newTable: Future[Table] = BigQuery.createTable[Person](datasetId, "newTableId") // #create-table diff --git a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/e2e/A.scala b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/e2e/A.scala index 0009706bb..f79f96c26 100644 --- a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/e2e/A.scala +++ b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/e2e/A.scala @@ -39,7 +39,7 @@ case class A(integer: Int, long: Long, float: Float, double: Double, string: Str new B(f.get(6).get("v"))) def getInteger = integer - @JsonSerialize(using = classOf[ToStringSerializer]) + @JsonSerialize(`using` = classOf[ToStringSerializer]) def getLong = long def getFloat = float def getDouble = double @@ -74,7 +74,7 @@ case class C(numeric: BigDecimal, date: LocalDate, time: LocalTime, dateTime: Lo LocalDateTime.parse(node.get("f").get(3).get("v").textValue()), Instant.ofEpochMilli((BigDecimal(node.get("f").get(4).get("v").textValue()) * 1000).toLong)) - @JsonSerialize(using = classOf[ToStringSerializer]) + @JsonSerialize(`using` = classOf[ToStringSerializer]) def getNumeric = numeric def getDate = date def getTime = time diff --git a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/e2e/scaladsl/BigQueryEndToEndSpec.scala b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/e2e/scaladsl/BigQueryEndToEndSpec.scala index 76e3b046e..a18ad78e5 100644 --- a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/e2e/scaladsl/BigQueryEndToEndSpec.scala +++ b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/e2e/scaladsl/BigQueryEndToEndSpec.scala @@ -14,12 +14,14 @@ package org.apache.pekko.stream.connectors.googlecloud.bigquery.e2e.scaladsl import org.apache.pekko -import pekko.actor.ActorSystem +import pekko.actor.{ ActorSystem, Scheduler } import pekko.{ pattern, Done } import pekko.stream.connectors.googlecloud.bigquery.HoverflySupport import pekko.stream.connectors.googlecloud.bigquery.e2e.{ A, B, C } import pekko.stream.connectors.googlecloud.bigquery.model.JobState import pekko.stream.connectors.googlecloud.bigquery.model.TableReference +import pekko.stream.connectors.googlecloud.bigquery.scaladsl.schema.TableSchemaWriter +import pekko.stream.connectors.googlecloud.bigquery.scaladsl.spray.BigQueryRootJsonFormat import pekko.testkit.TestKit import io.specto.hoverfly.junit.core.{ HoverflyMode, SimulationSource } import org.scalatest.BeforeAndAfterAll @@ -56,7 +58,7 @@ class BigQueryEndToEndSpec super.afterAll() } - implicit def scheduler = system.scheduler + implicit def scheduler: Scheduler = system.scheduler "BigQuery Scala DSL" should { @@ -66,12 +68,12 @@ class BigQueryEndToEndSpec import pekko.stream.connectors.googlecloud.bigquery.scaladsl.spray.BigQueryJsonProtocol._ import pekko.stream.scaladsl.{ Sink, Source } - implicit val cFormat = bigQueryJsonFormat5(C) - implicit val bFormat = bigQueryJsonFormat3(B) - implicit val aFormat = bigQueryJsonFormat7(A) - implicit val cSchema = bigQuerySchema5(C) - implicit val bSchema = bigQuerySchema3(B) - implicit val aSchema = bigQuerySchema7(A) + implicit val cFormat: BigQueryRootJsonFormat[C] = bigQueryJsonFormat5(C.apply) + implicit val bFormat: BigQueryRootJsonFormat[B] = bigQueryJsonFormat3(B.apply) + implicit val aFormat: BigQueryRootJsonFormat[A] = bigQueryJsonFormat7(A.apply) + implicit val cSchema: TableSchemaWriter[C] = bigQuerySchema5(C.apply) + implicit val bSchema: TableSchemaWriter[B] = bigQuerySchema3(B.apply) + implicit val aSchema: TableSchemaWriter[A] = bigQuerySchema7(A.apply) "create dataset" in { BigQuery.createDataset(datasetId).map { dataset => diff --git a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala index 7051651d9..a02043b99 100644 --- a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala +++ b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryQueriesSpec.scala @@ -94,7 +94,7 @@ class BigQueryQueriesSpec .post(BigQueryEndpoints.queries(settings.projectId).path.toString) .queryParam("prettyPrint", "false") .anyBody() - .willReturn(success(completeQuery.toJson.toString(), "application/json")))) + .willReturn(success(completeQuery.toJson.compactPrint, "application/json")))) query[JsValue]("SQL") .addAttributes(GoogleAttributes.settings(settings)) @@ -111,7 +111,7 @@ class BigQueryQueriesSpec .post(BigQueryEndpoints.queries(settings.projectId).path.toString) .queryParam("prettyPrint", "false") .anyBody() - .willReturn(success(completeQueryWith2ndPage.toJson.toString(), "application/json")) + .willReturn(success(completeQueryWith2ndPage.toJson.compactPrint, "application/json")) .get(BigQueryEndpoints.query(settings.projectId, jobId).path.toString) .queryParam("pageToken", pageToken) .queryParam("prettyPrint", "false") @@ -132,7 +132,7 @@ class BigQueryQueriesSpec .post(BigQueryEndpoints.queries(settings.projectId).path.toString) .queryParam("prettyPrint", "false") .anyBody() - .willReturn(success(incompleteQuery.toJson.toString(), "application/json")) + .willReturn(success(incompleteQuery.toJson.compactPrint, "application/json")) .get(BigQueryEndpoints.query(settings.projectId, jobId).path.toString) .queryParam("prettyPrint", "false") .willReturn(success(completeQuery.toJson.toString, "application/json")))) @@ -152,7 +152,7 @@ class BigQueryQueriesSpec .post(BigQueryEndpoints.queries(settings.projectId).path.toString) .queryParam("prettyPrint", "false") .anyBody() - .willReturn(success(incompleteQuery.toJson.toString(), "application/json")) + .willReturn(success(incompleteQuery.toJson.compactPrint, "application/json")) .get(BigQueryEndpoints.query(settings.projectId, jobId).path.toString) .queryParam("prettyPrint", "false") .willReturn(success(completeQueryWith2ndPage.toJson.toString, "application/json")) @@ -176,7 +176,7 @@ class BigQueryQueriesSpec .post(BigQueryEndpoints.queries(settings.projectId).path.toString) .queryParam("prettyPrint", "false") .anyBody() - .willReturn(success(completeQueryWithoutJobId.toJson.toString(), "application/json")))) + .willReturn(success(completeQueryWithoutJobId.toJson.compactPrint, "application/json")))) query[JsValue]("SQL") .addAttributes(GoogleAttributes.settings(settings)) @@ -204,7 +204,7 @@ class BigQueryQueriesSpec .post(BigQueryEndpoints.queries(settings.projectId).path.toString) .queryParam("prettyPrint", "false") .anyBody() - .willReturn(success(completeQuery.toJson.toString(), "application/json")))) + .willReturn(success(completeQuery.toJson.compactPrint, "application/json")))) recoverToSucceededIf[BrokenParserException] { query[JsValue]("SQL") diff --git a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/BigQuerySchemasSpec.scala b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/BigQuerySchemasSpec.scala index 0aef83003..a46711188 100644 --- a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/BigQuerySchemasSpec.scala +++ b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/BigQuerySchemasSpec.scala @@ -50,20 +50,20 @@ class BigQuerySchemasSpec extends AnyWordSpecLike with Matchers { "BigQuerySchemas" should { "correctly generate schema" in { - implicit val cSchemaWriter = bigQuerySchema1(C) - implicit val bSchemaWriter = bigQuerySchema2(B) - val generatedSchema = bigQuerySchema7(A).write + implicit val cSchemaWriter: TableSchemaWriter[C] = bigQuerySchema1(C.apply) + implicit val bSchemaWriter: TableSchemaWriter[B] = bigQuerySchema2(B.apply) + val generatedSchema = bigQuerySchema7(A.apply).write generatedSchema shouldEqual schema } "throw exception when nesting options" in { case class Invalid(invalid: Option[Option[String]]) - assertThrows[IllegalArgumentException](bigQuerySchema1(Invalid).write) + assertThrows[IllegalArgumentException](bigQuerySchema1(Invalid.apply).write) } "throw exception when nesting options inside seqs" in { case class Invalid(invalid: Seq[Option[String]]) - assertThrows[IllegalArgumentException](bigQuerySchema1(Invalid).write) + assertThrows[IllegalArgumentException](bigQuerySchema1(Invalid.apply).write) } } } diff --git a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/spray/BigQueryJsonProtocolSpec.scala b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/spray/BigQueryJsonProtocolSpec.scala index 36dd1d822..1e7a48aff 100644 --- a/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/spray/BigQueryJsonProtocolSpec.scala +++ b/google-cloud-bigquery/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/spray/BigQueryJsonProtocolSpec.scala @@ -59,8 +59,8 @@ class BigQueryJsonProtocolSpec extends BigQueryJsonProtocol with AnyWordSpecLike case class Record(name: Option[String], addresses: Seq[Address]) case class Address(street: Option[String], city: Option[String]) - implicit val addressFormat = bigQueryJsonFormat2(Address) - implicit val recordFormat = bigQueryJsonFormat2(Record) + implicit val addressFormat: BigQueryRootJsonFormat[Address] = bigQueryJsonFormat2(Address.apply) + implicit val recordFormat: BigQueryRootJsonFormat[Record] = bigQueryJsonFormat2(Record.apply) "BigQueryJsonProtocol" should { diff --git a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala index 66b1e27a7..90dd33581 100644 --- a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala +++ b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala @@ -69,7 +69,7 @@ private[pubsub] trait PubSubApi { def PubSubGoogleApisPort: Int def isEmulated: Boolean - private implicit val instantFormat = new RootJsonFormat[Instant] { + private implicit val instantFormat: RootJsonFormat[Instant] = new RootJsonFormat[Instant] { override def read(jsValue: JsValue): Instant = jsValue match { case JsString(time) => Instant.parse(time) case _ => deserializationError("Instant required as a string of RFC3339 UTC Zulu format.") @@ -77,7 +77,7 @@ private[pubsub] trait PubSubApi { override def write(instant: Instant): JsValue = JsString(instant.toString) } - private implicit val pubSubMessageFormat = + private implicit val pubSubMessageFormat: RootJsonFormat[PubSubMessage] = new RootJsonFormat[PubSubMessage] { override def read(json: JsValue): PubSubMessage = { val fields = json.asJsObject.fields @@ -98,7 +98,7 @@ private[pubsub] trait PubSubApi { ++ m.attributes.map(attributes => "attributes" -> attributes.toJson): _*) } - private implicit val publishMessageFormat = new RootJsonFormat[PublishMessage] { + private implicit val publishMessageFormat: RootJsonFormat[PublishMessage] = new RootJsonFormat[PublishMessage] { def read(json: JsValue): PublishMessage = { val data = json.asJsObject.fields("data").convertTo[String] val attributes = json.asJsObject.fields("attributes").convertTo[immutable.Map[String, String]] @@ -112,37 +112,39 @@ private[pubsub] trait PubSubApi { m.attributes.map(a => "attributes" -> a.toJson): _*) } - private implicit val pubSubRequestFormat = new RootJsonFormat[PublishRequest] { + private implicit val pubSubRequestFormat: RootJsonFormat[PublishRequest] = new RootJsonFormat[PublishRequest] { def read(json: JsValue): PublishRequest = PublishRequest(json.asJsObject.fields("messages").convertTo[immutable.Seq[PublishMessage]]) def write(pr: PublishRequest): JsValue = JsObject("messages" -> pr.messages.toJson) } - private implicit val gcePubSubResponseFormat = new RootJsonFormat[PublishResponse] { + private implicit val gcePubSubResponseFormat: RootJsonFormat[PublishResponse] = new RootJsonFormat[PublishResponse] { def read(json: JsValue): PublishResponse = PublishResponse(json.asJsObject.fields("messageIds").convertTo[immutable.Seq[String]]) def write(pr: PublishResponse): JsValue = JsObject("messageIds" -> pr.messageIds.toJson) } - private implicit val receivedMessageFormat = new RootJsonFormat[ReceivedMessage] { + private implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] = new RootJsonFormat[ReceivedMessage] { def read(json: JsValue): ReceivedMessage = ReceivedMessage(json.asJsObject.fields("ackId").convertTo[String], json.asJsObject.fields("message").convertTo[PubSubMessage]) def write(rm: ReceivedMessage): JsValue = JsObject("ackId" -> rm.ackId.toJson, "message" -> rm.message.toJson) } - private implicit val pubSubPullResponseFormat = new RootJsonFormat[PullResponse] { + private implicit val pubSubPullResponseFormat: RootJsonFormat[PullResponse] = new RootJsonFormat[PullResponse] { def read(json: JsValue): PullResponse = PullResponse(json.asJsObject.fields.get("receivedMessages").map(_.convertTo[immutable.Seq[ReceivedMessage]])) def write(pr: PullResponse): JsValue = pr.receivedMessages.map(rm => JsObject("receivedMessages" -> rm.toJson)).getOrElse(JsObject.empty) } - private implicit val acknowledgeRequestFormat = new RootJsonFormat[AcknowledgeRequest] { - def read(json: JsValue): AcknowledgeRequest = - AcknowledgeRequest(json.asJsObject.fields("ackIds").convertTo[immutable.Seq[String]]: _*) - def write(ar: AcknowledgeRequest): JsValue = JsObject("ackIds" -> ar.ackIds.toJson) - } - private implicit val pullRequestFormat = DefaultJsonProtocol.jsonFormat2(PullRequest.apply) + private implicit val acknowledgeRequestFormat: RootJsonFormat[AcknowledgeRequest] = + new RootJsonFormat[AcknowledgeRequest] { + def read(json: JsValue): AcknowledgeRequest = + AcknowledgeRequest(json.asJsObject.fields("ackIds").convertTo[immutable.Seq[String]]: _*) + def write(ar: AcknowledgeRequest): JsValue = JsObject("ackIds" -> ar.ackIds.toJson) + } + private implicit val pullRequestFormat: RootJsonFormat[PullRequest] = + DefaultJsonProtocol.jsonFormat2(PullRequest.apply) private def scheme: String = if (isEmulated) "http" else "https" diff --git a/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApiSpec.scala b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApiSpec.scala index d5bbb9e7e..6228752a1 100644 --- a/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApiSpec.scala +++ b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApiSpec.scala @@ -60,7 +60,7 @@ class PubSubApiSpec extends AnyFlatSpec with BeforeAndAfterAll with ScalaFutures s"pekko.connectors.google.credentials.none.project-id = ${TestCredentials.projectId}") .withFallback(ConfigFactory.load())) - implicit val defaultPatience = + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 5.seconds, interval = 100.millis) def createInsecureSslEngine(host: String, port: Int): SSLEngine = { diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/GoogleSettings.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/GoogleSettings.scala index 6ca40022d..ae402473f 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/GoogleSettings.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/GoogleSettings.scala @@ -90,7 +90,8 @@ object GoogleSettings { } -final case class GoogleSettings @InternalApi private[connectors] (projectId: String, +@InternalApi +final case class GoogleSettings(projectId: String, credentials: Credentials, requestSettings: RequestSettings) { def getProjectId = projectId @@ -134,7 +135,8 @@ object RequestSettings { apply(userIp.toScala, quotaUser.toScala, prettyPrint, chunkSize, retrySettings, forwardProxy.toScala) } -final case class RequestSettings @InternalApi private[connectors] ( +@InternalApi +final case class RequestSettings( userIp: Option[String], quotaUser: Option[String], prettyPrint: Boolean, diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala index 8dbc91cc2..969cef4c1 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala @@ -14,7 +14,6 @@ package org.apache.pekko.stream.connectors.google import org.apache.pekko -import pekko.actor.ActorSystem import pekko.NotUsed import pekko.annotation.InternalApi import pekko.http.scaladsl.model.HttpMethods.{ POST, PUT } diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala index f65b2a534..6bff78f49 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/NoCredentials.scala @@ -32,7 +32,7 @@ private[connectors] object NoCredentials { } @InternalApi -private[auth] final case class NoCredentials private (projectId: String, token: String) extends Credentials { +private[connectors] final case class NoCredentials(projectId: String, token: String) extends Credentials { private val futureToken = Future.successful(OAuth2BearerToken(token)) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 8006d1e7d..6158ce97b 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -195,7 +195,6 @@ object Dependencies { ) ++ Mockito) val GoogleBigQuery = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-jackson" % PekkoHttpVersion % Provided, @@ -221,7 +220,6 @@ object Dependencies { "org.apache.pekko" %% "pekko-discovery" % PekkoVersion) ++ Mockito) val GooglePubSub = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
