This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
commit 2746bf3a92be54896f599bfd8edcde60f771e7a2 Author: Scala Steward <[email protected]> AuthorDate: Sat Jan 17 16:22:22 2026 +0000 Reformat with scalafmt 3.10.4 Executed command: scalafmt --non-interactive --- .../connectors/amqp/impl/AmqpRpcFlowStage.scala | 8 +++-- .../test/scala/docs/scaladsl/AmqpDocsSpec.scala | 3 +- .../amqp/scaladsl/AmqpConnectorsSpec.scala | 6 ++-- ...AmqpGraphStageLogicConnectionShutdownSpec.scala | 5 +-- .../pekko/stream/connectors/awsspi/TestBase.scala | 4 +-- .../stream/connectors/awsspi/sqs/SQSITTest.scala | 5 +-- .../test/scala/docs/scaladsl/AzureQueueSpec.scala | 3 +- build.sbt | 36 ++++++++++++++-------- .../scala/docs/scaladsl/CassandraFlowSpec.scala | 5 +-- .../couchbase/javadsl/CouchbaseFlow.scala | 6 +--- .../couchbase/scaladsl/CouchbaseFlow.scala | 11 ++----- .../scala/docs/scaladsl/CouchbaseFlowSpec.scala | 6 +--- .../test/scala/docs/scaladsl/CsvToMapSpec.scala | 18 +++++++---- .../stream/connectors/dynamodb/DynamoDbOp.scala | 5 +-- .../pekko/stream/connectors/dynamodb/TestOps.scala | 6 ++-- .../impl/ElasticsearchSourceStage.scala | 19 +++++------- .../elasticsearch/impl/RestBulkApiV5.scala | 14 ++++----- .../elasticsearch/impl/RestBulkApiV7.scala | 7 ++--- .../scaladsl/ElasticsearchConnectorBehaviour.scala | 6 ++-- .../docs/scaladsl/ElasticsearchSpecUtils.scala | 6 +--- .../scala/docs/scaladsl/ElasticsearchV5Spec.scala | 19 +++++------- .../scala/docs/scaladsl/ElasticsearchV7Spec.scala | 9 ++++-- .../scaladsl/OpensearchConnectorBehaviour.scala | 6 ++-- .../scala/docs/scaladsl/OpensearchV1Spec.scala | 18 +++++------ .../file/impl/archive/TarArchiveEntry.scala | 3 +- .../test/scala/docs/scaladsl/TarArchiveSpec.scala | 3 +- .../connectors/ftp/impl/FtpIOGraphStage.scala | 9 +----- .../stream/connectors/ftp/impl/FtpOperations.scala | 3 +- .../test/scala/docs/scaladsl/ExampleReader.scala | 5 +-- .../storage/scaladsl/BigQueryAvroStorageSpec.scala | 4 +-- .../storage/scaladsl/BigQueryStorageSpec.scala | 4 +-- .../googlecloud/bigquery/BigQueryExt.scala | 7 +---- .../googlecloud/bigquery/javadsl/BigQuery.scala | 4 +-- .../bigquery/scaladsl/BigQueryJobs.scala | 5 +-- .../bigquery/scaladsl/BigQueryTableData.scala | 5 +-- .../bigquery/scaladsl/schema/BasicSchemas.scala | 6 +--- .../bigquery/scaladsl/schema/JavaTimeSchemas.scala | 5 +-- .../scaladsl/schema/PrimitiveSchemaWriter.scala | 4 +-- .../bigquery/scaladsl/schema/SchemaWriter.scala | 4 +-- .../src/test/scala/docs/scaladsl/BigQueryDoc.scala | 9 +----- .../pubsub/grpc/javadsl/GrpcPublisher.scala | 7 +---- .../pubsub/grpc/javadsl/GrpcSubscriber.scala | 7 +---- .../pubsub/grpc/scaladsl/GrpcPublisher.scala | 7 +---- .../pubsub/grpc/scaladsl/GrpcSubscriber.scala | 7 +---- .../connectors/googlecloud/pubsub/model.scala | 9 ++++-- .../connectors/googlecloud/pubsub/ModelSpec.scala | 18 +++++++---- .../pekko/stream/connectors/google/GoogleExt.scala | 7 +---- .../stream/connectors/google/util/EitherFlow.scala | 3 +- .../connectors/huawei/pushkit/HmsSettingExt.scala | 7 +---- .../stream/connectors/ironmq/javadsl/package.scala | 3 +- .../jakartams/ConnectionRetrySettings.scala | 5 +-- .../connectors/jakartams/SendRetrySettings.scala | 5 +-- .../jakartams/impl/JmsMessageReader.scala | 3 +- .../scaladsl/JmsBufferedAckConnectorsSpec.scala | 3 +- .../scala/docs/scaladsl/JmsConnectorsSpec.scala | 6 ++-- .../scala/docs/scaladsl/JmsTxConnectorsSpec.scala | 6 ++-- .../jakartams/scaladsl/JmsAckConnectorsSpec.scala | 3 +- .../connectors/jms/ConnectionRetrySettings.scala | 5 +-- .../stream/connectors/jms/SendRetrySettings.scala | 5 +-- .../connectors/jms/impl/JmsMessageReader.scala | 3 +- .../scala/docs/scaladsl/JmsConnectorsSpec.scala | 6 ++-- .../scala/docs/scaladsl/JmsTxConnectorsSpec.scala | 6 ++-- .../connectors/kinesis/scaladsl/KinesisFlow.scala | 5 +-- .../kinesis/scaladsl/KinesisSchedulerSource.scala | 4 +-- .../kinesis/KinesisSchedulerSourceSpec.scala | 13 ++++---- .../connectors/kinesis/ShardSettingsSpec.scala | 3 +- .../connectors/mongodb/javadsl/MongoFlow.scala | 6 +--- .../connectors/mongodb/javadsl/MongoSink.scala | 6 +--- .../connectors/mongodb/scaladsl/MongoSink.scala | 6 +--- .../test/scala/docs/scaladsl/MongoSinkSpec.scala | 6 ++-- .../mqtt/streaming/javadsl/MqttSession.scala | 6 ++-- .../stream/connectors/mqtt/streaming/model.scala | 9 ++---- .../mqtt/streaming/scaladsl/MqttSession.scala | 3 +- .../test/scala/docs/scaladsl/MqttCodecSpec.scala | 3 +- .../test/scala/docs/scaladsl/MqttSessionSpec.scala | 11 +++---- .../connectors/mqtt/impl/MqttFlowStage.scala | 12 ++------ .../test/scala/docs/scaladsl/OrientDbSpec.scala | 5 +-- .../pravega/impl/PravegaTableSource.scala | 5 +-- .../scala/docs/scaladsl/PravegaReadWriteDocs.scala | 6 +--- .../scala/docs/scaladsl/PravegaSettingsSpec.scala | 6 +--- project/Common.scala | 14 +++++---- .../stream/connectors/reference/Resource.scala | 7 +---- .../connectors/reference/javadsl/Reference.scala | 5 +-- .../connectors/reference/scaladsl/Reference.scala | 4 +-- .../stream/connectors/s3/impl/HttpRequests.scala | 7 +---- .../stream/connectors/s3/impl/Marshalling.scala | 26 ++++++---------- .../connectors/s3/impl/auth/CanonicalRequest.scala | 3 +- s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala | 5 +-- .../stream/connectors/s3/S3ExceptionSpec.scala | 9 ++++-- .../stream/connectors/s3/S3SettingsSpec.scala | 9 ++++-- .../connectors/s3/impl/MarshallingSpec.scala | 3 +- .../connectors/s3/impl/SplitAfterSizeSpec.scala | 3 +- .../impl/auth/SplitAfterSizeWithContextSpec.scala | 3 +- .../connectors/s3/scaladsl/S3IntegrationSpec.scala | 6 ++-- .../connectors/sqs/javadsl/SqsPublishFlow.scala | 6 +--- .../connectors/sqs/scaladsl/SqsAckFlow.scala | 6 ++-- .../connectors/sqs/scaladsl/SqsPublishFlow.scala | 3 +- .../connectors/sqs/testkit/MessageFactory.scala | 4 +-- .../test/scala/docs/scaladsl/SqsPublishSpec.scala | 6 ++-- .../test/scala/docs/scaladsl/SqsSourceSpec.scala | 12 +++----- .../impl/UnixDomainSocketImpl.scala | 4 +-- 101 files changed, 294 insertions(+), 408 deletions(-) diff --git a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala index 4c5b453eb..e2b186336 100644 --- a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala +++ b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala @@ -74,7 +74,9 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf try { channel.basicAck(deliveryTag, multiple) unackedMessages -= 1 - if (unackedMessages == 0 && (isClosed(out) || (isClosed( + if (unackedMessages == 0 && + (isClosed(out) || + (isClosed( in) && queue.isEmpty && outstandingMessages == 0))) completeStage() promise.complete(Success(Done)) @@ -88,7 +90,9 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf try { channel.basicNack(deliveryTag, multiple, requeue) unackedMessages -= 1 - if (unackedMessages == 0 && (isClosed(out) || (isClosed( + if (unackedMessages == 0 && + (isClosed(out) || + (isClosed( in) && queue.isEmpty && outstandingMessages == 0))) completeStage() promise.complete(Success(Done)) diff --git a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala index d0956f465..3220ba8c4 100644 --- a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala +++ b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala @@ -179,7 +179,8 @@ class AmqpDocsSpec extends AmqpSpec { mergingFlow.shutdown() } - "publish and consume elements through a simple queue again in the same JVM without autoAck" in assertAllStagesStopped { + "publish and consume elements through a simple queue again in the same JVM without autoAck" in + assertAllStagesStopped { val queueName = "amqp-conn-it-spec-no-auto-ack-" + System.currentTimeMillis() val queueDeclaration = QueueDeclaration(queueName) diff --git a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala index bb038874a..6505b24f7 100644 --- a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala +++ b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala @@ -84,7 +84,8 @@ class AmqpConnectorsSpec extends AmqpSpec with ScalaCheckDrivenPropertyChecks { } } - "publish via RPC which expects 2 responses per message and then consume through a simple queue again in the same JVM" in assertAllStagesStopped { + "publish via RPC which expects 2 responses per message and then consume through a simple queue again in the same JVM" in + assertAllStagesStopped { forAll { (reuseByteArray: Boolean) => val queueName = "amqp-conn-it-spec-rpc-queue-" + System.currentTimeMillis() val queueDeclaration = QueueDeclaration(queueName) @@ -338,7 +339,8 @@ class AmqpConnectorsSpec extends AmqpSpec with ScalaCheckDrivenPropertyChecks { } } - "publish via RPC and then consume through a simple queue again in the same JVM without autoAck" in assertAllStagesStopped { + "publish via RPC and then consume through a simple queue again in the same JVM without autoAck" in + assertAllStagesStopped { forAll { (reuseByteArray: Boolean) => val queueName = "amqp-conn-it-spec-rpc-queue-" + System.currentTimeMillis() diff --git a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala index ae249678f..743581bc5 100644 --- a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala +++ b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala @@ -19,10 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.pekko import pekko.actor.ActorSystem import pekko.stream.connectors.amqp.{ - AmqpCachedConnectionProvider, - AmqpConnectionFactoryConnectionProvider, - AmqpProxyConnection, - AmqpWriteSettings, + AmqpCachedConnectionProvider, AmqpConnectionFactoryConnectionProvider, AmqpProxyConnection, AmqpWriteSettings, QueueDeclaration } import pekko.stream.connectors.testkit.scaladsl.LogCapturing diff --git a/aws-spi-pekko-http-int-tests/src/test/scala/org/apache/pekko/stream/connectors/awsspi/TestBase.scala b/aws-spi-pekko-http-int-tests/src/test/scala/org/apache/pekko/stream/connectors/awsspi/TestBase.scala index d0494ad9a..baff80bea 100644 --- a/aws-spi-pekko-http-int-tests/src/test/scala/org/apache/pekko/stream/connectors/awsspi/TestBase.scala +++ b/aws-spi-pekko-http-int-tests/src/test/scala/org/apache/pekko/stream/connectors/awsspi/TestBase.scala @@ -18,9 +18,7 @@ package org.apache.pekko.stream.connectors.awsspi import software.amazon.awssdk.auth.credentials.{ - AwsCredentialsProviderChain, - EnvironmentVariableCredentialsProvider, - ProfileCredentialsProvider, + AwsCredentialsProviderChain, EnvironmentVariableCredentialsProvider, ProfileCredentialsProvider, SystemPropertyCredentialsProvider } import software.amazon.awssdk.regions.Region diff --git a/aws-spi-pekko-http-int-tests/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sqs/SQSITTest.scala b/aws-spi-pekko-http-int-tests/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sqs/SQSITTest.scala index 4aa7f562e..c986c79e0 100644 --- a/aws-spi-pekko-http-int-tests/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sqs/SQSITTest.scala +++ b/aws-spi-pekko-http-int-tests/src/test/scala/org/apache/pekko/stream/connectors/awsspi/sqs/SQSITTest.scala @@ -23,10 +23,7 @@ import org.scalatest.wordspec.AnyWordSpec import org.scalatest.matchers.should.Matchers import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.{ - CreateQueueRequest, - DeleteQueueRequest, - ReceiveMessageRequest, - SendMessageRequest + CreateQueueRequest, DeleteQueueRequest, ReceiveMessageRequest, SendMessageRequest } import scala.util.Random diff --git a/azure-storage-queue/src/test/scala/docs/scaladsl/AzureQueueSpec.scala b/azure-storage-queue/src/test/scala/docs/scaladsl/AzureQueueSpec.scala index b99c54073..5af63948f 100644 --- a/azure-storage-queue/src/test/scala/docs/scaladsl/AzureQueueSpec.scala +++ b/azure-storage-queue/src/test/scala/docs/scaladsl/AzureQueueSpec.scala @@ -99,7 +99,8 @@ class AzureQueueSpec extends TestKit(ActorSystem()) with AsyncFlatSpecLike with futureAssertion } - it should "observe batchSize and not pull too many message in from the CouldQueue into the buffer" in assertAllStagesStopped { + it should "observe batchSize and not pull too many message in from the CouldQueue into the buffer" in + assertAllStagesStopped { val msgs = (1 to 20).map(_ => queueTestMsg) msgs.foreach(m => queue.addMessage(m)) diff --git a/build.sbt b/build.sbt index 1f47b8772..d21f22fcb 100644 --- a/build.sbt +++ b/build.sbt @@ -405,13 +405,18 @@ lazy val docs = project "scaladoc.org.apache.pekko.base_url" -> s"https://pekko.apache.org/api/pekko/${Dependencies.PekkoBinaryVersion}", "javadoc.org.apache.pekko.base_url" -> s"https://pekko.apache.org/japi/pekko/${Dependencies.PekkoBinaryVersion}/", "javadoc.org.apache.pekko.link_style" -> "direct", - "extref.pekko-http.base_url" -> s"https://pekko.apache.org/docs/pekko-http/${Dependencies.PekkoHttpBinaryVersion}/%s", - "scaladoc.org.apache.pekko.http.base_url" -> s"https://pekko.apache.org/api/pekko-http/${Dependencies.PekkoHttpBinaryVersion}/", - "javadoc.org.apache.pekko.http.base_url" -> s"https://pekko.apache.org/japi/pekko-http/${Dependencies.PekkoHttpBinaryVersion}/", + "extref.pekko-http.base_url" -> + s"https://pekko.apache.org/docs/pekko-http/${Dependencies.PekkoHttpBinaryVersion}/%s", + "scaladoc.org.apache.pekko.http.base_url" -> + s"https://pekko.apache.org/api/pekko-http/${Dependencies.PekkoHttpBinaryVersion}/", + "javadoc.org.apache.pekko.http.base_url" -> + s"https://pekko.apache.org/japi/pekko-http/${Dependencies.PekkoHttpBinaryVersion}/", // Pekko gRPC "pekko-grpc.version" -> Dependencies.PekkoGrpcBinaryVersion, - "extref.pekko-grpc.base_url" -> s"https://pekko.apache.org/docs/pekko-grpc/${Dependencies.PekkoGrpcBinaryVersion}/%s", - "scaladoc.org.apache.pekko.gprc.base_url" -> s"https://pekko.apache.org/api/pekko-grpc/${Dependencies.PekkoGrpcBinaryVersion}/org/apache/pekko/gprc", + "extref.pekko-grpc.base_url" -> + s"https://pekko.apache.org/docs/pekko-grpc/${Dependencies.PekkoGrpcBinaryVersion}/%s", + "scaladoc.org.apache.pekko.gprc.base_url" -> + s"https://pekko.apache.org/api/pekko-grpc/${Dependencies.PekkoGrpcBinaryVersion}/org/apache/pekko/gprc", // Couchbase "couchbase.version" -> Dependencies.CouchbaseVersion, "extref.couchbase.base_url" -> s"https://docs.couchbase.com/java-sdk/${Dependencies.CouchbaseVersionForDocs}/%s", @@ -424,25 +429,32 @@ lazy val docs = project "extref.slick.base_url" -> s"https://scala-slick.org/doc/${Dependencies.SlickVersion}/%s", // Cassandra "extref.cassandra.base_url" -> s"https://cassandra.apache.org/doc/${Dependencies.CassandraVersionInDocs}/%s", - "extref.cassandra-driver.base_url" -> s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.CassandraDriverVersionInDocs}/%s", - "javadoc.com.datastax.oss.base_url" -> s"https://docs.datastax.com/en/drivers/java/${Dependencies.CassandraDriverVersionInDocs}/", + "extref.cassandra-driver.base_url" -> + s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.CassandraDriverVersionInDocs}/%s", + "javadoc.com.datastax.oss.base_url" -> + s"https://docs.datastax.com/en/drivers/java/${Dependencies.CassandraDriverVersionInDocs}/", // Solr "extref.solr.base_url" -> s"https://solr.apache.org/guide/${Dependencies.SolrVersionForDocs}/%s", - "javadoc.org.apache.solr.base_url" -> s"https://solr.apache.org/docs/${Dependencies.SolrVersionForDocs}_0/solr-solrj/", + "javadoc.org.apache.solr.base_url" -> + s"https://solr.apache.org/docs/${Dependencies.SolrVersionForDocs}_0/solr-solrj/", // Java "javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/", "javadoc.javax.jms.base_url" -> "https://docs.oracle.com/javaee/7/api/", - "javadoc.com.couchbase.base_url" -> s"https://docs.couchbase.com/sdk-api/couchbase-java-client-${Dependencies.CouchbaseVersion}/", + "javadoc.com.couchbase.base_url" -> + s"https://docs.couchbase.com/sdk-api/couchbase-java-client-${Dependencies.CouchbaseVersion}/", "javadoc.io.pravega.base_url" -> s"http://pravega.io/docs/${Dependencies.PravegaVersionForDocs}/javadoc/clients/", "javadoc.org.apache.kudu.base_url" -> s"https://kudu.apache.org/releases/${Dependencies.KuduVersion}/apidocs/", "javadoc.org.apache.hadoop.base_url" -> s"https://hadoop.apache.org/docs/r${Dependencies.HadoopVersion}/api/", "javadoc.software.amazon.awssdk.base_url" -> "https://sdk.amazonaws.com/java/api/latest/", - "javadoc.com.google.auth.base_url" -> "https://www.javadoc.io/doc/com.google.auth/google-auth-library-credentials/latest/", + "javadoc.com.google.auth.base_url" -> + "https://www.javadoc.io/doc/com.google.auth/google-auth-library-credentials/latest/", "javadoc.com.google.auth.link_style" -> "direct", - "javadoc.com.fasterxml.jackson.annotation.base_url" -> "https://javadoc.io/doc/com.fasterxml.jackson.core/jackson-annotations/latest/", + "javadoc.com.fasterxml.jackson.annotation.base_url" -> + "https://javadoc.io/doc/com.fasterxml.jackson.core/jackson-annotations/latest/", "javadoc.com.fasterxml.jackson.annotation.link_style" -> "direct", // Scala - "scaladoc.spray.json.base_url" -> s"https://javadoc.io/doc/io.spray/spray-json_${scalaBinaryVersion.value}/latest/", + "scaladoc.spray.json.base_url" -> + s"https://javadoc.io/doc/io.spray/spray-json_${scalaBinaryVersion.value}/latest/", // Eclipse Paho client for MQTT "javadoc.org.eclipse.paho.client.mqttv3.base_url" -> "https://www.eclipse.org/paho/files/javadoc/", "javadoc.org.eclipse.paho.mqttv5.client.base_url" -> "https://www.eclipse.org/paho/files/javadoc/", diff --git a/cassandra/src/test/scala/docs/scaladsl/CassandraFlowSpec.scala b/cassandra/src/test/scala/docs/scaladsl/CassandraFlowSpec.scala index 3ba096514..11ad2b151 100644 --- a/cassandra/src/test/scala/docs/scaladsl/CassandraFlowSpec.scala +++ b/cassandra/src/test/scala/docs/scaladsl/CassandraFlowSpec.scala @@ -18,10 +18,7 @@ import pekko.{ Done, NotUsed } import pekko.actor.ActorSystem import pekko.stream.connectors.cassandra.{ CassandraSessionSettings, CassandraWriteSettings } import pekko.stream.connectors.cassandra.scaladsl.{ - CassandraFlow, - CassandraSession, - CassandraSource, - CassandraSpecBase + CassandraFlow, CassandraSession, CassandraSource, CassandraSpecBase } import pekko.stream.scaladsl.{ Sink, Source, SourceWithContext } import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped diff --git a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseFlow.scala b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseFlow.scala index aa267cdab..525d8d1e2 100644 --- a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseFlow.scala +++ b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseFlow.scala @@ -16,11 +16,7 @@ package org.apache.pekko.stream.connectors.couchbase.javadsl import org.apache.pekko import pekko.NotUsed import pekko.stream.connectors.couchbase.{ - scaladsl, - CouchbaseDeleteResult, - CouchbaseSessionSettings, - CouchbaseWriteResult, - CouchbaseWriteSettings + scaladsl, CouchbaseDeleteResult, CouchbaseSessionSettings, CouchbaseWriteResult, CouchbaseWriteSettings } import pekko.stream.javadsl.Flow import com.couchbase.client.java.document.{ Document, JsonDocument } diff --git a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala index ad35b1cee..823595c07 100644 --- a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala +++ b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala @@ -16,15 +16,8 @@ package org.apache.pekko.stream.connectors.couchbase.scaladsl import org.apache.pekko import pekko.NotUsed import pekko.stream.connectors.couchbase.{ - CouchbaseDeleteFailure, - CouchbaseDeleteResult, - CouchbaseDeleteSuccess, - CouchbaseSessionRegistry, - CouchbaseSessionSettings, - CouchbaseWriteFailure, - CouchbaseWriteResult, - CouchbaseWriteSettings, - CouchbaseWriteSuccess + CouchbaseDeleteFailure, CouchbaseDeleteResult, CouchbaseDeleteSuccess, CouchbaseSessionRegistry, + CouchbaseSessionSettings, CouchbaseWriteFailure, CouchbaseWriteResult, CouchbaseWriteSettings, CouchbaseWriteSuccess } import pekko.stream.scaladsl.Flow import com.couchbase.client.java.document.{ Document, JsonDocument } diff --git a/couchbase/src/test/scala/docs/scaladsl/CouchbaseFlowSpec.scala b/couchbase/src/test/scala/docs/scaladsl/CouchbaseFlowSpec.scala index 95254c2d7..8bb7dd6f2 100644 --- a/couchbase/src/test/scala/docs/scaladsl/CouchbaseFlowSpec.scala +++ b/couchbase/src/test/scala/docs/scaladsl/CouchbaseFlowSpec.scala @@ -16,11 +16,7 @@ package docs.scaladsl import org.apache.pekko import pekko.Done import pekko.stream.connectors.couchbase.{ - CouchbaseDeleteFailure, - CouchbaseDeleteResult, - CouchbaseWriteFailure, - CouchbaseWriteResult, - CouchbaseWriteSettings + CouchbaseDeleteFailure, CouchbaseDeleteResult, CouchbaseWriteFailure, CouchbaseWriteResult, CouchbaseWriteSettings } import pekko.stream.connectors.couchbase.scaladsl.CouchbaseFlow import pekko.stream.connectors.couchbase.testing.{ CouchbaseSupport, TestObject } diff --git a/csv/src/test/scala/docs/scaladsl/CsvToMapSpec.scala b/csv/src/test/scala/docs/scaladsl/CsvToMapSpec.scala index 55d956015..1a8d2a414 100644 --- a/csv/src/test/scala/docs/scaladsl/CsvToMapSpec.scala +++ b/csv/src/test/scala/docs/scaladsl/CsvToMapSpec.scala @@ -196,7 +196,8 @@ class CsvToMapSpec extends CsvSpec { // #column-names } - "parse header and decode data line. Be OK with more headers column than data (including the header in the result)" in assertAllStagesStopped { + "parse header and decode data line. Be OK with more headers column than data (including the header in the result)" in + assertAllStagesStopped { // #header-line import org.apache.pekko.stream.connectors.csv.scaladsl.{ CsvParsing, CsvToMap } @@ -225,7 +226,8 @@ class CsvToMapSpec extends CsvSpec { // #header-line } - "parse header and decode data line. Be OK when there are more data than header column, set a default header in the result" in assertAllStagesStopped { + "parse header and decode data line. Be OK when there are more data than header column, set a default header in the result" in + assertAllStagesStopped { // #header-line import org.apache.pekko.stream.connectors.csv.scaladsl.{ CsvParsing, CsvToMap } @@ -254,7 +256,8 @@ class CsvToMapSpec extends CsvSpec { // #header-line } - "parse header and decode data line. Be OK when there are more data than header column, set the user configured header in the result" in assertAllStagesStopped { + "parse header and decode data line. Be OK when there are more data than header column, set the user configured header in the result" in + assertAllStagesStopped { // #header-line import org.apache.pekko.stream.connectors.csv.scaladsl.{ CsvParsing, CsvToMap } @@ -283,7 +286,8 @@ class CsvToMapSpec extends CsvSpec { // #header-line } - "parse header and decode data line. Be OK when there are more headers than data column, set the user configured field value in the result" in assertAllStagesStopped { + "parse header and decode data line. Be OK when there are more headers than data column, set the user configured field value in the result" in + assertAllStagesStopped { // #header-line import org.apache.pekko.stream.connectors.csv.scaladsl.{ CsvParsing, CsvToMap } @@ -383,7 +387,8 @@ class CsvToMapSpec extends CsvSpec { // #header-line } - "be OK when there are more data than header column, set the user configured header in the result" in assertAllStagesStopped { + "be OK when there are more data than header column, set the user configured header in the result" in + assertAllStagesStopped { // #header-line import org.apache.pekko.stream.connectors.csv.scaladsl.{ CsvParsing, CsvToMap } @@ -412,7 +417,8 @@ class CsvToMapSpec extends CsvSpec { // #header-line } - "be OK when there are more headers than data column, set the user configured field value in the result" in assertAllStagesStopped { + "be OK when there are more headers than data column, set the user configured field value in the result" in + assertAllStagesStopped { // #header-line import org.apache.pekko.stream.connectors.csv.scaladsl.{ CsvParsing, CsvToMap } diff --git a/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/DynamoDbOp.scala b/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/DynamoDbOp.scala index 6b69fcd58..629a2266c 100644 --- a/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/DynamoDbOp.scala +++ b/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/DynamoDbOp.scala @@ -20,10 +20,7 @@ import software.amazon.awssdk.core.async.SdkPublisher import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.dynamodb.model._ import software.amazon.awssdk.services.dynamodb.paginators.{ - BatchGetItemPublisher, - ListTablesPublisher, - QueryPublisher, - ScanPublisher + BatchGetItemPublisher, ListTablesPublisher, QueryPublisher, ScanPublisher } import scala.concurrent.Future diff --git a/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/TestOps.scala b/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/TestOps.scala index bf41666b9..76b1f1f89 100644 --- a/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/TestOps.scala +++ b/dynamodb/src/test/scala/org/apache/pekko/stream/connectors/dynamodb/TestOps.scala @@ -31,7 +31,8 @@ trait TestOps { sortCol -> N(sort)) def keyEQ(hash: String): Map[String, Condition] = Map( - keyCol -> Condition + keyCol -> + Condition .builder() .comparisonOperator(ComparisonOperator.EQ) .attributeValueList(S(hash)) @@ -111,7 +112,8 @@ abstract class ItemSpecOps extends TestOps { WriteRequest .builder() .putRequest( - PutRequest.builder().item((keyMap(i.toString, i) + ("data1" -> S( + PutRequest.builder().item((keyMap(i.toString, i) + + ("data1" -> S( "0123456789" * 39000))).asJava).build()) .build() }.asJava).asJava) diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala index c5b2207ce..2d74b8d53 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala @@ -20,11 +20,7 @@ import pekko.http.scaladsl.model.Uri.Path import pekko.http.scaladsl.model._ import pekko.http.scaladsl.unmarshalling.Unmarshal import pekko.stream.connectors.elasticsearch.{ - ApiVersion, - ElasticsearchParams, - OpensearchApiVersion, - ReadResult, - SourceSettingsBase + ApiVersion, ElasticsearchParams, OpensearchApiVersion, ReadResult, SourceSettingsBase } import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, StageLogging } import pekko.stream.{ Attributes, Materializer, Outlet, SourceShape } @@ -145,12 +141,13 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( val queryParams = baseMap ++ routingQueryParam ++ sortQueryParam val completeParams = searchParams ++ extraParams.flatten - "routing" - val searchBody = "{" + completeParams - .map { - case (name, json) => - "\"" + name + "\":" + json - } - .mkString(",") + "}" + val searchBody = "{" + + completeParams + .map { + case (name, json) => + "\"" + name + "\":" + json + } + .mkString(",") + "}" val endpoint: String = settings.apiVersion match { case ApiVersion.V5 => s"/${elasticsearchParams.indexName}/${elasticsearchParams.typeName.get}/_search" diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV5.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV5.scala index 30cc12a5b..f6514bc7a 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV5.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV5.scala @@ -50,16 +50,14 @@ private[impl] final class RestBulkApiV5[T, C](indexName: String, "index" -> JsObject(sharedFields ++ fields: _*) case Create => "create" -> JsObject(sharedFields ++ optionalString("_id", message.id): _*) case Update | Upsert => - val fields = - ("_id" -> JsString(message.id.get)) +: Seq( - optionalNumber("_version", message.version), - optionalString("version_type", versionType)).flatten + val fields = ("_id" -> JsString(message.id.get)) +: Seq( + optionalNumber("_version", message.version), + optionalString("version_type", versionType)).flatten "update" -> JsObject(sharedFields ++ fields: _*) case Delete => - val fields = - ("_id" -> JsString(message.id.get)) +: Seq( - optionalNumber("_version", message.version), - optionalString("version_type", versionType)).flatten + val fields = ("_id" -> JsString(message.id.get)) +: Seq( + optionalNumber("_version", message.version), + optionalString("version_type", versionType)).flatten "delete" -> JsObject(sharedFields ++ fields: _*) case Nop => "" -> JsObject() } diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV7.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV7.scala index caa790d51..214b8d7dd 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV7.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV7.scala @@ -48,10 +48,9 @@ private[impl] final class RestBulkApiV7[T, C](indexName: String, case Create => "create" -> JsObject(sharedFields ++ optionalString("_id", message.id): _*) case Update | Upsert => "update" -> JsObject(sharedFields :+ ("_id" -> JsString(message.id.get)): _*) case Delete => - val fields = - ("_id" -> JsString(message.id.get)) +: Seq( - optionalNumber("version", message.version), - optionalString("version_type", versionType)).flatten + val fields = ("_id" -> JsString(message.id.get)) +: Seq( + optionalNumber("version", message.version), + optionalString("version_type", versionType)).flatten "delete" -> JsObject(sharedFields ++ fields: _*) case Nop => "" -> JsObject() } diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala index 1ad8dab0e..56ce918c2 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala @@ -299,7 +299,8 @@ trait ElasticsearchConnectorBehaviour { writeResults.map(_._2) should contain theSameElementsInOrderAs Seq(0, 1, 2) val (failed, _) = writeResults.filter(!_._1.success).head - failed.message shouldBe WriteMessage + failed.message shouldBe + WriteMessage .createIndexMessage("1", JsObject("subject" -> "Akka Concurrency".toJson)) .withPassThrough(1) failed.errorReason shouldBe Some( @@ -623,7 +624,8 @@ trait ElasticsearchConnectorBehaviour { // sort: _doc is by design an undefined order and is non-deterministic // we cannot check a specific order of values - result should contain theSameElementsAs (List("Akka in Action", + result should contain theSameElementsAs + (List("Akka in Action", "Programming in Scala", "Learning Scala", "Scala for Spark in Production", diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala index 2c67de274..fe8ca901f 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala @@ -20,11 +20,7 @@ import pekko.http.scaladsl.model.Uri.Path import pekko.http.scaladsl.model.{ ContentTypes, HttpMethods, HttpRequest, Uri } import pekko.stream.connectors.elasticsearch.scaladsl.ElasticsearchSource import pekko.stream.connectors.elasticsearch.{ - ApiVersionBase, - ElasticsearchConnectionSettings, - ElasticsearchParams, - OpensearchApiVersion, - OpensearchParams, + ApiVersionBase, ElasticsearchConnectionSettings, ElasticsearchParams, OpensearchApiVersion, OpensearchParams, SourceSettingsBase } import pekko.stream.scaladsl.Sink diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala index 7bc0bb25b..731ec0e3b 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala @@ -18,14 +18,8 @@ import pekko.http.scaladsl.model.{ HttpMethods, HttpRequest, Uri } import pekko.http.scaladsl.model.Uri.Path import pekko.stream.connectors.elasticsearch.scaladsl.{ ElasticsearchFlow, ElasticsearchSink, ElasticsearchSource } import pekko.stream.connectors.elasticsearch.{ - ApiVersion, - ElasticsearchConnectionSettings, - ElasticsearchSourceSettings, - ElasticsearchWriteSettings, - ReadResult, - StringMessageWriter, - WriteMessage, - WriteResult + ApiVersion, ElasticsearchConnectionSettings, ElasticsearchSourceSettings, ElasticsearchWriteSettings, ReadResult, + StringMessageWriter, WriteMessage, WriteResult } import pekko.stream.scaladsl.{ Sink, Source } import pekko.testkit.TestKit @@ -225,7 +219,8 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt // Make sure all messages was committed to kafka committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) readTitlesFrom(ApiVersion.V5, baseSourceSettings, - indexName).futureValue.toList should contain allElementsOf messagesFromKafka + indexName).futureValue.toList should contain allElementsOf + messagesFromKafka .map(_.book.title) } @@ -277,7 +272,8 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt // Make sure all messages was committed to kafka committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) readTitlesFrom(ApiVersion.V5, baseSourceSettings, - indexName).futureValue.toList should contain allElementsOf messagesFromKafka + indexName).futureValue.toList should contain allElementsOf + messagesFromKafka .map(_.book.title) } @@ -334,7 +330,8 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt // Make sure all messages was committed to kafka committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2, 3, 4, 5) readTitlesFrom(ApiVersion.V5, baseSourceSettings, - indexName).futureValue.toList should contain allElementsOf messagesFromKafka + indexName).futureValue.toList should contain allElementsOf + messagesFromKafka .filterNot(_.book.shouldSkip.getOrElse(false)) .map(_.book.title) } diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV7Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV7Spec.scala index bccd5afeb..461b8535e 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV7Spec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV7Spec.scala @@ -212,7 +212,8 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt // Make sure all messages was committed to kafka committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) readTitlesFrom(ApiVersion.V7, baseSourceSettings, - indexName).futureValue.toList should contain allElementsOf messagesFromKafka + indexName).futureValue.toList should contain allElementsOf + messagesFromKafka .map(_.book.title) } @@ -263,7 +264,8 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt // Make sure all messages was committed to kafka committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) readTitlesFrom(ApiVersion.V7, baseSourceSettings, - indexName).futureValue.toList should contain allElementsOf messagesFromKafka + indexName).futureValue.toList should contain allElementsOf + messagesFromKafka .map(_.book.title) } @@ -319,7 +321,8 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt // Make sure all messages was committed to kafka committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2, 3, 4, 5) readTitlesFrom(ApiVersion.V7, baseSourceSettings, - indexName).futureValue.toList should contain allElementsOf messagesFromKafka + indexName).futureValue.toList should contain allElementsOf + messagesFromKafka .filterNot(_.book.shouldSkip.getOrElse(false)) .map(_.book.title) } diff --git a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala index 345825d08..4e2755391 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala @@ -299,7 +299,8 @@ trait OpensearchConnectorBehaviour { writeResults.map(_._2) should contain theSameElementsInOrderAs Seq(0, 1, 2) val (failed, _) = writeResults.filter(!_._1.success).head - failed.message shouldBe WriteMessage + failed.message shouldBe + WriteMessage .createIndexMessage("1", JsObject("subject" -> "Akka Concurrency".toJson)) .withPassThrough(1) failed.errorReason shouldBe Some( @@ -623,7 +624,8 @@ trait OpensearchConnectorBehaviour { // sort: _doc is by design an undefined order and is non-deterministic // we cannot check a specific order of values - result should contain theSameElementsAs (List("Akka in Action", + result should contain theSameElementsAs + (List("Akka in Action", "Programming in Scala", "Learning Scala", "Scala for Spark in Production", diff --git a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala index 81a093399..6eb3c1111 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala @@ -17,13 +17,8 @@ import org.apache.pekko import pekko.http.scaladsl.model.Uri.Path import pekko.http.scaladsl.model.{ HttpMethods, HttpRequest, Uri } import pekko.stream.connectors.elasticsearch.{ - ElasticsearchConnectionSettings, - OpensearchApiVersion, - OpensearchConnectionSettings, - ReadResult, - StringMessageWriter, - WriteMessage, - WriteResult + ElasticsearchConnectionSettings, OpensearchApiVersion, OpensearchConnectionSettings, ReadResult, StringMessageWriter, + WriteMessage, WriteResult } import pekko.stream.connectors.elasticsearch.scaladsl.{ ElasticsearchFlow, ElasticsearchSink, ElasticsearchSource } import pekko.stream.connectors.elasticsearch._ @@ -230,7 +225,8 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils // Make sure all messages was committed to kafka committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, - indexName).futureValue.toList should contain allElementsOf messagesFromKafka + indexName).futureValue.toList should contain allElementsOf + messagesFromKafka .map(_.book.title) } @@ -281,7 +277,8 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils // Make sure all messages was committed to kafka committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, - indexName).futureValue.toList should contain allElementsOf messagesFromKafka + indexName).futureValue.toList should contain allElementsOf + messagesFromKafka .map(_.book.title) } @@ -337,7 +334,8 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils // Make sure all messages was committed to kafka committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2, 3, 4, 5) readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, - indexName).futureValue.toList should contain allElementsOf messagesFromKafka + indexName).futureValue.toList should contain allElementsOf + messagesFromKafka .filterNot(_.book.shouldSkip.getOrElse(false)) .map(_.book.title) } diff --git a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarArchiveEntry.scala b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarArchiveEntry.scala index 2ff3bde4d..cacc55d4e 100644 --- a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarArchiveEntry.scala +++ b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarArchiveEntry.scala @@ -141,7 +141,8 @@ import pekko.util.ByteString val fileNamePrefix = getString( bs, fileNameLength + fileModeLength + ownerIdLength + groupIdLength + fileSizeLength + - lastModificationLength + headerChecksumLength + linkIndicatorLength + linkFileNameLength + ustarIndicatorLength + ustarVersionLength + ownerNameLength + groupNameLength + deviceMajorNumberLength + deviceMinorNumberLength, + lastModificationLength + headerChecksumLength + linkIndicatorLength + linkFileNameLength + ustarIndicatorLength + + ustarVersionLength + ownerNameLength + groupNameLength + deviceMajorNumberLength + deviceMinorNumberLength, fileNamePrefixLength) TarArchiveMetadata(fileNamePrefix, filename, size, lastModification, linkIndicatorByte) } diff --git a/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala b/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala index 3963077e3..ad7035674 100644 --- a/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala +++ b/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala @@ -336,7 +336,8 @@ class TarArchiveSpec .runWith(Sink.ignore) val error = tar.failed.futureValue error shouldBe a[TarReaderException] - error.getMessage shouldBe "The tar content source was not subscribed to within 5000 milliseconds, it must be subscribed to to progress tar file reading." + error.getMessage shouldBe + "The tar content source was not subscribed to within 5000 milliseconds, it must be subscribed to to progress tar file reading." } } diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala index b26f17426..7059632b3 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala @@ -18,14 +18,7 @@ import org.apache.pekko import pekko.stream.impl.Stages.DefaultAttributes.IODispatcher import pekko.stream.stage.{ GraphStageWithMaterializedValue, InHandler, OutHandler } import pekko.stream.{ - Attributes, - IOOperationIncompleteException, - IOResult, - Inlet, - Outlet, - Shape, - SinkShape, - SourceShape + Attributes, IOOperationIncompleteException, IOResult, Inlet, Outlet, Shape, SinkShape, SourceShape } import pekko.util.ByteString import pekko.util.ByteString.ByteString1C diff --git a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpOperations.scala b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpOperations.scala index de0990fd0..c4d7bef60 100644 --- a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpOperations.scala +++ b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpOperations.scala @@ -37,7 +37,8 @@ private[ftp] trait FtpOperations extends CommonFtpOperations { self: FtpLike[FTP } catch { case e: java.net.ConnectException => throw new java.net.ConnectException( - e.getMessage + s" host=[${connectionSettings.host}], port=${connectionSettings.port} ${connectionSettings.proxy + e.getMessage + + s" host=[${connectionSettings.host}], port=${connectionSettings.port} ${connectionSettings.proxy .map("proxy=" + _.toString) .getOrElse("")}") } diff --git a/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala b/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala index 92a6d74d6..957343530 100644 --- a/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala +++ b/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala @@ -18,10 +18,7 @@ import pekko.actor.ActorSystem import pekko.http.scaladsl.unmarshalling.FromByteStringUnmarshaller import pekko.stream.connectors.googlecloud.bigquery.storage.{ BigQueryRecord, BigQueryStorageSettings } import pekko.stream.connectors.googlecloud.bigquery.storage.scaladsl.{ - BigQueryArrowStorage, - BigQueryAvroStorage, - BigQueryStorageAttributes, - GrpcBigQueryStorageReader + BigQueryArrowStorage, BigQueryAvroStorage, BigQueryStorageAttributes, GrpcBigQueryStorageReader } import org.scalatestplus.mockito.MockitoSugar.mock diff --git a/google-cloud-bigquery-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/storage/scaladsl/BigQueryAvroStorageSpec.scala b/google-cloud-bigquery-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/storage/scaladsl/BigQueryAvroStorageSpec.scala index 326e0997b..b9f9e125a 100644 --- a/google-cloud-bigquery-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/storage/scaladsl/BigQueryAvroStorageSpec.scala +++ b/google-cloud-bigquery-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/storage/scaladsl/BigQueryAvroStorageSpec.scala @@ -16,9 +16,7 @@ package org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.scaladsl import org.apache.pekko import pekko.stream.connectors.googlecloud.bigquery.storage.impl.AvroDecoder import pekko.stream.connectors.googlecloud.bigquery.storage.{ - BigQueryRecord, - BigQueryStorageSettings, - BigQueryStorageSpecBase + BigQueryRecord, BigQueryStorageSettings, BigQueryStorageSpecBase } import pekko.stream.connectors.testkit.scaladsl.LogCapturing import pekko.stream.scaladsl.Sink diff --git a/google-cloud-bigquery-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/storage/scaladsl/BigQueryStorageSpec.scala b/google-cloud-bigquery-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/storage/scaladsl/BigQueryStorageSpec.scala index efff7c244..53aa9eadb 100644 --- a/google-cloud-bigquery-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/storage/scaladsl/BigQueryStorageSpec.scala +++ b/google-cloud-bigquery-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/storage/scaladsl/BigQueryStorageSpec.scala @@ -16,9 +16,7 @@ package org.apache.pekko.stream.connectors.googlecloud.bigquery.storage.scaladsl import org.apache.pekko import pekko.stream.connectors.googlecloud.bigquery.storage.impl.{ AvroDecoder, SimpleRowReader } import pekko.stream.connectors.googlecloud.bigquery.storage.{ - BigQueryRecord, - BigQueryStorageSettings, - BigQueryStorageSpecBase + BigQueryRecord, BigQueryStorageSettings, BigQueryStorageSpecBase } import pekko.stream.connectors.testkit.scaladsl.LogCapturing import pekko.stream.scaladsl.Sink diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/BigQueryExt.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/BigQueryExt.scala index 61ab2ce1e..32a3dbf1c 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/BigQueryExt.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/BigQueryExt.scala @@ -15,12 +15,7 @@ package org.apache.pekko.stream.connectors.googlecloud.bigquery import org.apache.pekko import pekko.actor.{ - ActorSystem, - ClassicActorSystemProvider, - ExtendedActorSystem, - Extension, - ExtensionId, - ExtensionIdProvider + ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import pekko.annotation.InternalApi diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/javadsl/BigQuery.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/javadsl/BigQuery.scala index 4a19a3b46..96329678e 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/javadsl/BigQuery.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/javadsl/BigQuery.scala @@ -28,9 +28,7 @@ import pekko.stream.connectors.googlecloud.bigquery.model.Dataset import pekko.stream.connectors.googlecloud.bigquery.model.{ Job, JobCancelResponse, JobReference } import pekko.stream.connectors.googlecloud.bigquery.model.{ QueryRequest, QueryResponse } import pekko.stream.connectors.googlecloud.bigquery.model.{ - TableDataInsertAllRequest, - TableDataInsertAllResponse, - TableDataListResponse + TableDataInsertAllRequest, TableDataInsertAllResponse, TableDataListResponse } import pekko.stream.connectors.googlecloud.bigquery.model.{ Table, TableListResponse, TableReference, TableSchema } import pekko.stream.connectors.googlecloud.bigquery.scaladsl.{ BigQuery => ScalaBigQuery } diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryJobs.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryJobs.scala index c6ea7b12f..6bd66948d 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryJobs.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryJobs.scala @@ -31,10 +31,7 @@ import pekko.stream.connectors.googlecloud.bigquery._ import pekko.stream.connectors.googlecloud.bigquery.model.CreateDisposition.CreateNever import pekko.stream.connectors.googlecloud.bigquery.model.SourceFormat.NewlineDelimitedJsonFormat import pekko.stream.connectors.googlecloud.bigquery.model.{ - Job, - JobCancelResponse, - JobConfiguration, - JobConfigurationLoad + Job, JobCancelResponse, JobConfiguration, JobConfigurationLoad } import pekko.stream.connectors.googlecloud.bigquery.model.TableReference import pekko.stream.connectors.googlecloud.bigquery.model.WriteDisposition.WriteAppend diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryTableData.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryTableData.scala index 8fb9680b5..35b5ad754 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryTableData.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/BigQueryTableData.scala @@ -26,10 +26,7 @@ import pekko.stream.connectors.google.{ GoogleAttributes, GoogleSettings } import pekko.stream.connectors.google.http.GoogleHttp import pekko.stream.connectors.google.implicits._ import pekko.stream.connectors.googlecloud.bigquery.model.{ - Row, - TableDataInsertAllRequest, - TableDataInsertAllResponse, - TableDataListResponse + Row, TableDataInsertAllRequest, TableDataInsertAllResponse, TableDataListResponse } import pekko.stream.connectors.googlecloud.bigquery.{ BigQueryEndpoints, BigQueryException, InsertAllRetryPolicy } import pekko.stream.scaladsl.{ Flow, Keep, Sink, Source } diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/BasicSchemas.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/BasicSchemas.scala index 6961cbe4a..c816a3aee 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/BasicSchemas.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/BasicSchemas.scala @@ -14,11 +14,7 @@ package org.apache.pekko.stream.connectors.googlecloud.bigquery.scaladsl.schema import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.TableFieldSchemaType.{ - Boolean, - Float, - Integer, - Numeric, - String + Boolean, Float, Integer, Numeric, String } /** diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/JavaTimeSchemas.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/JavaTimeSchemas.scala index 48631efc2..262ddadeb 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/JavaTimeSchemas.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/JavaTimeSchemas.scala @@ -14,10 +14,7 @@ package org.apache.pekko.stream.connectors.googlecloud.bigquery.scaladsl.schema import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.TableFieldSchemaType.{ - Date, - DateTime, - Time, - Timestamp + Date, DateTime, Time, Timestamp } import java.time.{ Instant, LocalDate, LocalDateTime, LocalTime } diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/PrimitiveSchemaWriter.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/PrimitiveSchemaWriter.scala index b4c469b16..6dd44c931 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/PrimitiveSchemaWriter.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/PrimitiveSchemaWriter.scala @@ -14,9 +14,7 @@ package org.apache.pekko.stream.connectors.googlecloud.bigquery.scaladsl.schema import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.{ - TableFieldSchema, - TableFieldSchemaMode, - TableFieldSchemaType + TableFieldSchema, TableFieldSchemaMode, TableFieldSchemaType } private[schema] final class PrimitiveSchemaWriter[T](`type`: TableFieldSchemaType) extends SchemaWriter[T] { diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/SchemaWriter.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/SchemaWriter.scala index 70194bf34..0bf586806 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/SchemaWriter.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/SchemaWriter.scala @@ -14,9 +14,7 @@ package org.apache.pekko.stream.connectors.googlecloud.bigquery.scaladsl.schema import org.apache.pekko.stream.connectors.googlecloud.bigquery.model.{ - TableFieldSchema, - TableFieldSchemaMode, - TableSchema + TableFieldSchema, TableFieldSchemaMode, TableSchema } import scala.annotation.implicitNotFound diff --git a/google-cloud-bigquery/src/test/scala/docs/scaladsl/BigQueryDoc.scala b/google-cloud-bigquery/src/test/scala/docs/scaladsl/BigQueryDoc.scala index 2c71e35e2..fa5f270c1 100644 --- a/google-cloud-bigquery/src/test/scala/docs/scaladsl/BigQueryDoc.scala +++ b/google-cloud-bigquery/src/test/scala/docs/scaladsl/BigQueryDoc.scala @@ -19,14 +19,7 @@ import pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import pekko.stream.connectors.google.{ GoogleAttributes, GoogleSettings } import pekko.stream.connectors.googlecloud.bigquery.InsertAllRetryPolicy import pekko.stream.connectors.googlecloud.bigquery.model.{ - Dataset, - Job, - JobReference, - JobState, - QueryResponse, - Table, - TableDataListResponse, - TableListResponse + Dataset, Job, JobReference, JobState, QueryResponse, Table, TableDataListResponse, TableListResponse } import pekko.stream.connectors.googlecloud.bigquery.scaladsl.schema.BigQuerySchemas._ import pekko.stream.connectors.googlecloud.bigquery.scaladsl.schema.TableSchemaWriter diff --git a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcPublisher.scala b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcPublisher.scala index 9fd70656a..8b6e33aca 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcPublisher.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcPublisher.scala @@ -15,12 +15,7 @@ package org.apache.pekko.stream.connectors.googlecloud.pubsub.grpc.javadsl import org.apache.pekko import pekko.actor.{ - ActorSystem, - ClassicActorSystemProvider, - ExtendedActorSystem, - Extension, - ExtensionId, - ExtensionIdProvider + ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import pekko.annotation.ApiMayChange import pekko.stream.connectors.google.GoogleSettings diff --git a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcSubscriber.scala b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcSubscriber.scala index de2e738ad..ac798c346 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcSubscriber.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcSubscriber.scala @@ -15,12 +15,7 @@ package org.apache.pekko.stream.connectors.googlecloud.pubsub.grpc.javadsl import org.apache.pekko import pekko.actor.{ - ActorSystem, - ClassicActorSystemProvider, - ExtendedActorSystem, - Extension, - ExtensionId, - ExtensionIdProvider + ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import pekko.annotation.ApiMayChange import pekko.stream.connectors.google.GoogleSettings diff --git a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcPublisher.scala b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcPublisher.scala index 286c6d78e..f84bf2055 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcPublisher.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcPublisher.scala @@ -15,12 +15,7 @@ package org.apache.pekko.stream.connectors.googlecloud.pubsub.grpc.scaladsl import org.apache.pekko import pekko.actor.{ - ActorSystem, - ClassicActorSystemProvider, - ExtendedActorSystem, - Extension, - ExtensionId, - ExtensionIdProvider + ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import pekko.annotation.ApiMayChange import pekko.stream.connectors.google.GoogleSettings diff --git a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcSubscriber.scala b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcSubscriber.scala index 0d70d861d..a47f8ce0e 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcSubscriber.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcSubscriber.scala @@ -15,12 +15,7 @@ package org.apache.pekko.stream.connectors.googlecloud.pubsub.grpc.scaladsl import org.apache.pekko import pekko.actor.{ - ActorSystem, - ClassicActorSystemProvider, - ExtendedActorSystem, - Extension, - ExtensionId, - ExtensionIdProvider + ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import pekko.annotation.ApiMayChange import pekko.stream.connectors.google.GoogleSettings diff --git a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/model.scala b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/model.scala index 4b40aae0a..14e648712 100644 --- a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/model.scala +++ b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/model.scala @@ -124,15 +124,18 @@ final class PubSubMessage private (val data: Option[String], override def equals(other: Any): Boolean = other match { case that: PubSubMessage => - data == that.data && attributes == that.attributes && messageId == that.messageId && publishTime == that - .publishTime && orderingKey == that.orderingKey + data == that.data && attributes == that.attributes && messageId == that.messageId && + publishTime == + that + .publishTime && orderingKey == that.orderingKey case _ => false } override def hashCode: Int = java.util.Objects.hash(data, attributes, messageId, publishTime, orderingKey) override def toString: String = - "PubSubMessage(data=" + data + ",attributes=" + attributes + ",messageId=" + messageId + ",publishTime=" + publishTime + ",orderingKey=" + orderingKey + ")" + "PubSubMessage(data=" + data + ",attributes=" + attributes + ",messageId=" + messageId + ",publishTime=" + + publishTime + ",orderingKey=" + orderingKey + ")" } object PubSubMessage { diff --git a/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/ModelSpec.scala b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/ModelSpec.scala index 6b517eef6..adf249271 100644 --- a/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/ModelSpec.scala +++ b/google-cloud-pub-sub/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/ModelSpec.scala @@ -81,8 +81,10 @@ class ModelSpec extends AnyFunSuite with Matchers with LogCapturing { } test("PubSubMessage toString") { - pubSubMessage1.toString shouldBe "PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=None)" - pubSubMessage7.toString shouldBe "PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=Some(qwe))" + pubSubMessage1.toString shouldBe + "PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=None)" + pubSubMessage7.toString shouldBe + "PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=Some(qwe))" } val receivedMessage1 = ReceivedMessage("1", pubSubMessage1) @@ -149,8 +151,10 @@ class ModelSpec extends AnyFunSuite with Matchers with LogCapturing { } test("PublishRequest toString") { - publishRequest1.toString shouldBe "PublishRequest([PublishMessage(data=abcde,attributes=Some(Map(k1 -> v1, k2 -> v2)),orderingKey=None)])" - publishRequest4.toString shouldBe "PublishRequest([PublishMessage(data=abcde,attributes=Some(Map(k1 -> v1, k2 -> v2)),orderingKey=Some(qwe))])" + publishRequest1.toString shouldBe + "PublishRequest([PublishMessage(data=abcde,attributes=Some(Map(k1 -> v1, k2 -> v2)),orderingKey=None)])" + publishRequest4.toString shouldBe + "PublishRequest([PublishMessage(data=abcde,attributes=Some(Map(k1 -> v1, k2 -> v2)),orderingKey=Some(qwe))])" } private val publishResponse1 = PublishResponse(Seq.empty[String]) @@ -189,7 +193,9 @@ class ModelSpec extends AnyFunSuite with Matchers with LogCapturing { } test("PullResponse toString") { - pullResponse1.toString shouldBe "PullResponse(Some([ReceivedMessage(ackId=1,message=PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=None))]))" - pullResponse4.toString shouldBe "PullResponse(Some([ReceivedMessage(ackId=2,message=PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=Some(qwe)))]))" + pullResponse1.toString shouldBe + "PullResponse(Some([ReceivedMessage(ackId=1,message=PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=None))]))" + pullResponse4.toString shouldBe + "PullResponse(Some([ReceivedMessage(ackId=2,message=PubSubMessage(data=Some(data),attributes=Some(Map(k1 -> v1)),messageId=Id-1,publishTime=1970-01-01T00:00:00Z,orderingKey=Some(qwe)))]))" } } diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/GoogleExt.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/GoogleExt.scala index bb6680a89..179a7318d 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/GoogleExt.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/GoogleExt.scala @@ -15,12 +15,7 @@ package org.apache.pekko.stream.connectors.google import org.apache.pekko import pekko.actor.{ - ActorSystem, - ClassicActorSystemProvider, - ExtendedActorSystem, - Extension, - ExtensionId, - ExtensionIdProvider + ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import pekko.annotation.InternalApi diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/util/EitherFlow.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/util/EitherFlow.scala index 3904db2cc..9ee79823a 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/util/EitherFlow.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/util/EitherFlow.scala @@ -31,7 +31,8 @@ private[google] object EitherFlow { val in = b.add(Partition[Either[LeftIn, RightIn]](2, x => if (x.isRight) 1 else 0)) val out = b.add(Merge[Either[LeftOut, RightOut]](2)) in ~> Flow[Either[LeftIn, RightIn]].map(_.swap.toOption.get) ~> leftFlow ~> Flow[LeftOut].map(Left(_)) ~> out - in ~> Flow[Either[LeftIn, RightIn]].map(_.toOption.get) ~> rightFlow ~> Flow[RightOut] + in ~> Flow[Either[LeftIn, RightIn]].map(_.toOption.get) ~> rightFlow ~> + Flow[RightOut] .map( Right(_)) ~> out FlowShape(in.in, out.out) diff --git a/huawei-push-kit/src/main/scala/org/apache/pekko/stream/connectors/huawei/pushkit/HmsSettingExt.scala b/huawei-push-kit/src/main/scala/org/apache/pekko/stream/connectors/huawei/pushkit/HmsSettingExt.scala index 019e6d2d4..9c03fd920 100644 --- a/huawei-push-kit/src/main/scala/org/apache/pekko/stream/connectors/huawei/pushkit/HmsSettingExt.scala +++ b/huawei-push-kit/src/main/scala/org/apache/pekko/stream/connectors/huawei/pushkit/HmsSettingExt.scala @@ -15,12 +15,7 @@ package org.apache.pekko.stream.connectors.huawei.pushkit import org.apache.pekko import pekko.actor.{ - ActorSystem, - ClassicActorSystemProvider, - ExtendedActorSystem, - Extension, - ExtensionId, - ExtensionIdProvider + ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import pekko.annotation.InternalApi diff --git a/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/javadsl/package.scala b/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/javadsl/package.scala index 93d40b731..50fbaf540 100644 --- a/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/javadsl/package.scala +++ b/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/javadsl/package.scala @@ -18,8 +18,7 @@ import java.util.concurrent.CompletionStage import org.apache.pekko import pekko.Done import pekko.stream.connectors.ironmq.scaladsl.{ - Committable => ScalaCommittable, - CommittableMessage => ScalaCommittableMessage + Committable => ScalaCommittable, CommittableMessage => ScalaCommittableMessage } import scala.concurrent.Future diff --git a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/ConnectionRetrySettings.scala b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/ConnectionRetrySettings.scala index 13752d252..388cb0d1f 100644 --- a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/ConnectionRetrySettings.scala +++ b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/ConnectionRetrySettings.scala @@ -63,8 +63,9 @@ final class ConnectionRetrySettings private ( def withInfiniteRetries(): ConnectionRetrySettings = withMaxRetries(ConnectionRetrySettings.infiniteRetries) /** The wait time before the next attempt may be made. */ - def waitTime(retryNumber: Int): FiniteDuration = - (initialRetry * Math.pow(retryNumber, backoffFactor)).asInstanceOf[FiniteDuration].min(maxBackoff) + def waitTime(retryNumber: Int) + : FiniteDuration = (initialRetry * Math.pow(retryNumber, backoffFactor)).asInstanceOf[FiniteDuration].min( + maxBackoff) private def copy( connectTimeout: scala.concurrent.duration.FiniteDuration = connectTimeout, diff --git a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/SendRetrySettings.scala b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/SendRetrySettings.scala index c56953302..e8e66ba14 100644 --- a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/SendRetrySettings.scala +++ b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/SendRetrySettings.scala @@ -51,8 +51,9 @@ final class SendRetrySettings private (val initialRetry: scala.concurrent.durati def withInfiniteRetries(): SendRetrySettings = withMaxRetries(SendRetrySettings.infiniteRetries) /** The wait time before the next attempt may be made. */ - def waitTime(retryNumber: Int): FiniteDuration = - (initialRetry * Math.pow(retryNumber, backoffFactor)).asInstanceOf[FiniteDuration].min(maxBackoff) + def waitTime(retryNumber: Int) + : FiniteDuration = (initialRetry * Math.pow(retryNumber, backoffFactor)).asInstanceOf[FiniteDuration].min( + maxBackoff) private def copy( initialRetry: scala.concurrent.duration.FiniteDuration = initialRetry, diff --git a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageReader.scala b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageReader.scala index cd4fae6b4..675919d5b 100644 --- a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageReader.scala +++ b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageReader.scala @@ -56,7 +56,8 @@ private[jakartams] object JmsMessageReader { .asInstanceOf[java.util.Enumeration[String]] .asScala .map { key => - key -> (accessor(key) match { + key -> + (accessor(key) match { case v: java.lang.Boolean => v.booleanValue() case v: java.lang.Byte => v.byteValue() case v: java.lang.Short => v.shortValue() diff --git a/jakartams/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala b/jakartams/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala index cdbedf6dc..257e432d1 100644 --- a/jakartams/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala +++ b/jakartams/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala @@ -98,7 +98,8 @@ class JmsBufferedAckConnectorsSpec extends JmsSharedServerSpec { } } - "publish JMS text messages with properties through a queue and consume them with a selector" in withConnectionFactory() { + "publish JMS text messages with properties through a queue and consume them with a selector" in + withConnectionFactory() { connectionFactory => val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink( JmsProducerSettings(producerConfig, connectionFactory).withQueue("numbers")) diff --git a/jakartams/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala b/jakartams/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala index c34ee649a..c4cbc94cd 100644 --- a/jakartams/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala +++ b/jakartams/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala @@ -313,7 +313,8 @@ class JmsConnectorsSpec extends JmsSpec { } } - "publish JMS text messages with properties through a queue and consume them with a selector" in withConnectionFactory() { + "publish JMS text messages with properties through a queue and consume them with a selector" in + withConnectionFactory() { connectionFactory => val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink( JmsProducerSettings(producerConfig, connectionFactory).withQueue("numbers")) @@ -774,8 +775,7 @@ class JmsConnectorsSpec extends JmsSpec { JmsProducerSettings(system, connectionFactory) .withQueue("test")) - val input: immutable.Seq[JmsTextMessage] = - (1 to 100).map(i => JmsTextMessage(i.toString)) + val input: immutable.Seq[JmsTextMessage] = (1 to 100).map(i => JmsTextMessage(i.toString)) val result: Future[Seq[JmsMessage]] = Source(input) .via(flow) diff --git a/jakartams/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala b/jakartams/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala index 069658087..824a84c0f 100644 --- a/jakartams/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala +++ b/jakartams/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala @@ -134,7 +134,8 @@ class JmsTxConnectorsSpec extends JmsSharedServerSpec { result.futureValue should contain theSameElementsAs expectedElements } - "publish JMS text messages with properties through a queue and consume them with a selector" in withConnectionFactory() { + "publish JMS text messages with properties through a queue and consume them with a selector" in + withConnectionFactory() { connectionFactory => val queueName = createName("numbers") val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink( @@ -441,7 +442,8 @@ class JmsTxConnectorsSpec extends JmsSharedServerSpec { resultList.toSet should contain theSameElementsAs numsIn.map(_.toString) } - "ensure no message loss or starvation when exceptions occur in a stream missing commits" ignore withConnectionFactory() { + "ensure no message loss or starvation when exceptions occur in a stream missing commits" ignore + withConnectionFactory() { connectionFactory => val queueName = createName("numbers") val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink( diff --git a/jakartams/src/test/scala/org/apache/pekko/stream/connectors/jakartams/scaladsl/JmsAckConnectorsSpec.scala b/jakartams/src/test/scala/org/apache/pekko/stream/connectors/jakartams/scaladsl/JmsAckConnectorsSpec.scala index e9ef7c491..59fbda90a 100644 --- a/jakartams/src/test/scala/org/apache/pekko/stream/connectors/jakartams/scaladsl/JmsAckConnectorsSpec.scala +++ b/jakartams/src/test/scala/org/apache/pekko/stream/connectors/jakartams/scaladsl/JmsAckConnectorsSpec.scala @@ -90,7 +90,8 @@ class JmsAckConnectorsSpec extends JmsSpec { } } - "publish JMS text messages with properties through a queue and consume them with a selector" in withConnectionFactory() { + "publish JMS text messages with properties through a queue and consume them with a selector" in + withConnectionFactory() { connectionFactory => val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink( JmsProducerSettings(producerConfig, connectionFactory).withQueue("numbers")) diff --git a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/ConnectionRetrySettings.scala b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/ConnectionRetrySettings.scala index fab75228f..0393ead8c 100644 --- a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/ConnectionRetrySettings.scala +++ b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/ConnectionRetrySettings.scala @@ -63,8 +63,9 @@ final class ConnectionRetrySettings private ( def withInfiniteRetries(): ConnectionRetrySettings = withMaxRetries(ConnectionRetrySettings.infiniteRetries) /** The wait time before the next attempt may be made. */ - def waitTime(retryNumber: Int): FiniteDuration = - (initialRetry * Math.pow(retryNumber, backoffFactor)).asInstanceOf[FiniteDuration].min(maxBackoff) + def waitTime(retryNumber: Int) + : FiniteDuration = (initialRetry * Math.pow(retryNumber, backoffFactor)).asInstanceOf[FiniteDuration].min( + maxBackoff) private def copy( connectTimeout: scala.concurrent.duration.FiniteDuration = connectTimeout, diff --git a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/SendRetrySettings.scala b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/SendRetrySettings.scala index 5550b5105..dcc1a73e1 100644 --- a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/SendRetrySettings.scala +++ b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/SendRetrySettings.scala @@ -51,8 +51,9 @@ final class SendRetrySettings private (val initialRetry: scala.concurrent.durati def withInfiniteRetries(): SendRetrySettings = withMaxRetries(SendRetrySettings.infiniteRetries) /** The wait time before the next attempt may be made. */ - def waitTime(retryNumber: Int): FiniteDuration = - (initialRetry * Math.pow(retryNumber, backoffFactor)).asInstanceOf[FiniteDuration].min(maxBackoff) + def waitTime(retryNumber: Int) + : FiniteDuration = (initialRetry * Math.pow(retryNumber, backoffFactor)).asInstanceOf[FiniteDuration].min( + maxBackoff) private def copy( initialRetry: scala.concurrent.duration.FiniteDuration = initialRetry, diff --git a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageReader.scala b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageReader.scala index 19710d5b0..e0890c9ec 100644 --- a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageReader.scala +++ b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageReader.scala @@ -57,7 +57,8 @@ private[jms] object JmsMessageReader { .asInstanceOf[java.util.Enumeration[String]] .asScala .map { key => - key -> (accessor(key) match { + key -> + (accessor(key) match { case v: java.lang.Boolean => v.booleanValue() case v: java.lang.Byte => v.byteValue() case v: java.lang.Short => v.shortValue() diff --git a/jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala b/jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala index 0cb71fbcc..42c35f777 100644 --- a/jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala +++ b/jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala @@ -318,7 +318,8 @@ class JmsConnectorsSpec extends JmsSpec { } } - "publish JMS text messages with properties through a queue and consume them with a selector" in withConnectionFactory() { + "publish JMS text messages with properties through a queue and consume them with a selector" in + withConnectionFactory() { connectionFactory => val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink( JmsProducerSettings(producerConfig, connectionFactory).withQueue("numbers")) @@ -764,8 +765,7 @@ class JmsConnectorsSpec extends JmsSpec { JmsProducerSettings(system, connectionFactory) .withQueue("test")) - val input: immutable.Seq[JmsTextMessage] = - (1 to 100).map(i => JmsTextMessage(i.toString)) + val input: immutable.Seq[JmsTextMessage] = (1 to 100).map(i => JmsTextMessage(i.toString)) val result: Future[Seq[JmsMessage]] = Source(input) .via(flow) diff --git a/jms/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala b/jms/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala index 4c61c2481..fa74b1c06 100644 --- a/jms/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala +++ b/jms/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala @@ -135,7 +135,8 @@ class JmsTxConnectorsSpec extends JmsSharedServerSpec { result.futureValue should contain theSameElementsAs expectedElements } - "publish JMS text messages with properties through a queue and consume them with a selector" in withConnectionFactory() { + "publish JMS text messages with properties through a queue and consume them with a selector" in + withConnectionFactory() { connectionFactory => val queueName = createName("numbers") val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink( @@ -441,7 +442,8 @@ class JmsTxConnectorsSpec extends JmsSharedServerSpec { resultList.toSet should contain theSameElementsAs numsIn.map(_.toString) } - "ensure no message loss or starvation when exceptions occur in a stream missing commits" in withConnectionFactory() { + "ensure no message loss or starvation when exceptions occur in a stream missing commits" in + withConnectionFactory() { connectionFactory => val queueName = createName("numbers") val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink( diff --git a/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/scaladsl/KinesisFlow.scala b/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/scaladsl/KinesisFlow.scala index 16efc29f9..9a52bebfc 100644 --- a/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/scaladsl/KinesisFlow.scala +++ b/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/scaladsl/KinesisFlow.scala @@ -25,10 +25,7 @@ import pekko.util.ByteString import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.services.kinesis.KinesisAsyncClient import software.amazon.awssdk.services.kinesis.model.{ - PutRecordsRequest, - PutRecordsRequestEntry, - PutRecordsResponse, - PutRecordsResultEntry + PutRecordsRequest, PutRecordsRequestEntry, PutRecordsResponse, PutRecordsResultEntry } import scala.collection.immutable.Queue diff --git a/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/scaladsl/KinesisSchedulerSource.scala b/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/scaladsl/KinesisSchedulerSource.scala index 019769dba..1f028c359 100644 --- a/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/scaladsl/KinesisSchedulerSource.scala +++ b/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/scaladsl/KinesisSchedulerSource.scala @@ -18,9 +18,7 @@ import pekko.NotUsed import pekko.stream._ import pekko.stream.connectors.kinesis.impl.KinesisSchedulerSourceStage import pekko.stream.connectors.kinesis.{ - CommittableRecord, - KinesisSchedulerCheckpointSettings, - KinesisSchedulerSourceSettings + CommittableRecord, KinesisSchedulerCheckpointSettings, KinesisSchedulerSourceSettings } import pekko.stream.scaladsl.{ Flow, RunnableGraph, Sink, Source, SubFlow } import software.amazon.kinesis.coordinator.Scheduler diff --git a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala index 78ada709e..b77b6ddb9 100644 --- a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala +++ b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala @@ -40,9 +40,7 @@ import software.amazon.kinesis.coordinator.Scheduler import software.amazon.kinesis.lifecycle.ShutdownReason import software.amazon.kinesis.lifecycle.events.{ InitializationInput, ProcessRecordsInput, ShardEndedInput } import software.amazon.kinesis.processor.{ - RecordProcessorCheckpointer, - ShardRecordProcessor, - ShardRecordProcessorFactory + RecordProcessorCheckpointer, ShardRecordProcessor, ShardRecordProcessorFactory } import software.amazon.kinesis.retrieval.KinesisClientRecord import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber @@ -68,7 +66,8 @@ class KinesisSchedulerSourceSpec recordProcessor.processRecords(sampleRecordsInput()) val producedRecord: CommittableRecord = sinkProbe.requestNext() - producedRecord.processorData.recordProcessorStartingSequenceNumber shouldBe initializationInput + producedRecord.processorData.recordProcessorStartingSequenceNumber shouldBe + initializationInput .extendedSequenceNumber() producedRecord.processorData.shardId shouldBe initializationInput.shardId() producedRecord.record shouldBe sampleRecord @@ -116,7 +115,8 @@ class KinesisSchedulerSourceSpec recordProcessor.processRecords(sampleRecordsInput()) var producedRecord: CommittableRecord = sinkProbe.requestNext() - producedRecord.processorData.recordProcessorStartingSequenceNumber shouldBe initializationInput + producedRecord.processorData.recordProcessorStartingSequenceNumber shouldBe + initializationInput .extendedSequenceNumber() producedRecord.processorData.shardId shouldBe initializationInput.shardId() producedRecord.record shouldBe sampleRecord @@ -127,7 +127,8 @@ class KinesisSchedulerSourceSpec otherRecordProcessor.processRecords(sampleRecordsInput()) producedRecord = sinkProbe.requestNext() - producedRecord.processorData.recordProcessorStartingSequenceNumber shouldBe otherInitializationInput + producedRecord.processorData.recordProcessorStartingSequenceNumber shouldBe + otherInitializationInput .extendedSequenceNumber() producedRecord.processorData.shardId shouldBe otherInitializationInput.shardId() producedRecord.record shouldBe sampleRecord diff --git a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/ShardSettingsSpec.scala b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/ShardSettingsSpec.scala index 254051d0a..d4fa3d099 100644 --- a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/ShardSettingsSpec.scala +++ b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/ShardSettingsSpec.scala @@ -31,7 +31,8 @@ class ShardSettingsSpec extends AnyWordSpec with Matchers with LogCapturing { } "accept all combinations of alterations with ShardIterator" in { - noException should be thrownBy baseSettings + noException should be thrownBy + baseSettings .withShardIterator(ShardIterator.AtSequenceNumber("SQC")) .withShardIterator(ShardIterator.AfterSequenceNumber("SQC")) .withShardIterator(ShardIterator.AtTimestamp(Instant.EPOCH)) diff --git a/mongodb/src/main/scala/org/apache/pekko/stream/connectors/mongodb/javadsl/MongoFlow.scala b/mongodb/src/main/scala/org/apache/pekko/stream/connectors/mongodb/javadsl/MongoFlow.scala index a5d67ccf6..447281202 100644 --- a/mongodb/src/main/scala/org/apache/pekko/stream/connectors/mongodb/javadsl/MongoFlow.scala +++ b/mongodb/src/main/scala/org/apache/pekko/stream/connectors/mongodb/javadsl/MongoFlow.scala @@ -17,11 +17,7 @@ import org.apache.pekko import pekko.NotUsed import pekko.stream.connectors.mongodb.{ scaladsl, DocumentReplace, DocumentUpdate } import pekko.stream.connectors.mongodb.scaladsl.MongoFlow.{ - DefaultDeleteOptions, - DefaultInsertManyOptions, - DefaultInsertOneOptions, - DefaultReplaceOptions, - DefaultUpdateOptions + DefaultDeleteOptions, DefaultInsertManyOptions, DefaultInsertOneOptions, DefaultReplaceOptions, DefaultUpdateOptions } import pekko.stream.javadsl.Flow import com.mongodb.client.model.{ DeleteOptions, InsertManyOptions, InsertOneOptions, ReplaceOptions, UpdateOptions } diff --git a/mongodb/src/main/scala/org/apache/pekko/stream/connectors/mongodb/javadsl/MongoSink.scala b/mongodb/src/main/scala/org/apache/pekko/stream/connectors/mongodb/javadsl/MongoSink.scala index 9d7cbeb77..1a68f5120 100644 --- a/mongodb/src/main/scala/org/apache/pekko/stream/connectors/mongodb/javadsl/MongoSink.scala +++ b/mongodb/src/main/scala/org/apache/pekko/stream/connectors/mongodb/javadsl/MongoSink.scala @@ -19,11 +19,7 @@ import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.stream.connectors.mongodb.{ DocumentReplace, DocumentUpdate } import pekko.stream.connectors.mongodb.scaladsl.MongoFlow.{ - DefaultDeleteOptions, - DefaultInsertManyOptions, - DefaultInsertOneOptions, - DefaultReplaceOptions, - DefaultUpdateOptions + DefaultDeleteOptions, DefaultInsertManyOptions, DefaultInsertOneOptions, DefaultReplaceOptions, DefaultUpdateOptions } import pekko.stream.javadsl.{ Keep, Sink } import com.mongodb.client.model.{ DeleteOptions, InsertManyOptions, InsertOneOptions, ReplaceOptions, UpdateOptions } diff --git a/mongodb/src/main/scala/org/apache/pekko/stream/connectors/mongodb/scaladsl/MongoSink.scala b/mongodb/src/main/scala/org/apache/pekko/stream/connectors/mongodb/scaladsl/MongoSink.scala index 6822b7deb..0d172a592 100644 --- a/mongodb/src/main/scala/org/apache/pekko/stream/connectors/mongodb/scaladsl/MongoSink.scala +++ b/mongodb/src/main/scala/org/apache/pekko/stream/connectors/mongodb/scaladsl/MongoSink.scala @@ -18,11 +18,7 @@ import pekko.stream.scaladsl.{ Keep, Sink } import pekko.Done import pekko.stream.connectors.mongodb.{ DocumentReplace, DocumentUpdate } import pekko.stream.connectors.mongodb.scaladsl.MongoFlow.{ - DefaultDeleteOptions, - DefaultInsertManyOptions, - DefaultInsertOneOptions, - DefaultReplaceOptions, - DefaultUpdateOptions + DefaultDeleteOptions, DefaultInsertManyOptions, DefaultInsertOneOptions, DefaultReplaceOptions, DefaultUpdateOptions } import com.mongodb.client.model.{ DeleteOptions, InsertManyOptions, InsertOneOptions, ReplaceOptions, UpdateOptions } import com.mongodb.reactivestreams.client.MongoCollection diff --git a/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala b/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala index 9da0351ec..2f8a5807e 100644 --- a/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala +++ b/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala @@ -182,7 +182,8 @@ class MongoSinkSpec val found = Source.fromPublisher(numbersDocumentColl.find()).runWith(Sink.seq).futureValue found.map(doc => - doc.getInteger("value") -> doc.getInteger("updateValue")) must contain theSameElementsAs testRange + doc.getInteger("value") -> doc.getInteger("updateValue")) must contain theSameElementsAs + testRange .map(i => i -> i * -1) } @@ -198,7 +199,8 @@ class MongoSinkSpec val found = Source.fromPublisher(numbersDocumentColl.find()).runWith(Sink.seq).futureValue found.map(doc => - doc.getInteger("value") -> doc.getInteger("updateValue")) must contain theSameElementsAs testRange + doc.getInteger("value") -> doc.getInteger("updateValue")) must contain theSameElementsAs + testRange .map(i => i -> 0) } diff --git a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/javadsl/MqttSession.scala b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/javadsl/MqttSession.scala index 5b537f4ad..323c4e0cf 100644 --- a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/javadsl/MqttSession.scala +++ b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/javadsl/MqttSession.scala @@ -18,10 +18,8 @@ import org.apache.pekko import pekko.NotUsed import pekko.actor.ClassicActorSystemProvider import pekko.stream.connectors.mqtt.streaming.scaladsl.{ - ActorMqttClientSession => ScalaActorMqttClientSession, - ActorMqttServerSession => ScalaActorMqttServerSession, - MqttClientSession => ScalaMqttClientSession, - MqttServerSession => ScalaMqttServerSession + ActorMqttClientSession => ScalaActorMqttClientSession, ActorMqttServerSession => ScalaActorMqttServerSession, + MqttClientSession => ScalaMqttClientSession, MqttServerSession => ScalaMqttServerSession } import pekko.stream.javadsl.Source diff --git a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala index 98ab28889..2bdee1c89 100644 --- a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala +++ b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala @@ -94,8 +94,7 @@ final case class ControlPacketFlags(underlying: Int) extends AnyVal { /** * Convenience for testing bits - returns true if all passed in are set */ - def contains(bits: ControlPacketFlags): Boolean = - (underlying & bits.underlying) == bits.underlying + def contains(bits: ControlPacketFlags): Boolean = (underlying & bits.underlying) == bits.underlying } /** @@ -147,8 +146,7 @@ final case class ConnectFlags private[streaming] (underlying: Int) extends AnyVa /** * Convenience for testing bits - returns true if all passed in are set */ - def contains(bits: ConnectFlags): Boolean = - (underlying & bits.underlying) == bits.underlying + def contains(bits: ConnectFlags): Boolean = (underlying & bits.underlying) == bits.underlying } object Connect { @@ -258,8 +256,7 @@ final case class ConnAckReturnCode private[streaming] (underlying: Int) extends /** * Convenience for testing bits - returns true if all passed in are set */ - def contains(bits: ConnAckReturnCode): Boolean = - (underlying & bits.underlying) == bits.underlying + def contains(bits: ConnAckReturnCode): Boolean = (underlying & bits.underlying) == bits.underlying } /** diff --git a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/MqttSession.scala b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/MqttSession.scala index eb66ea3a4..7af83df97 100644 --- a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/MqttSession.scala +++ b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/MqttSession.scala @@ -586,7 +586,8 @@ final class ActorMqttServerSession(settings: MqttSessionSettings)(implicit syste case Command(cp: UnsubAck, completed, _) => val reply = Promise[Unpublisher.ForwardUnsubAck.type]() - unpublisherPacketRouter ! RemotePacketRouter + unpublisherPacketRouter ! + RemotePacketRouter .RouteViaConnection(connectionId, cp.packetId, Unpublisher.UnsubAckReceivedLocally(reply), reply) reply.future.onComplete { result => diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala index af162086f..4b65b51b3 100644 --- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala +++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttCodecSpec.scala @@ -128,7 +128,8 @@ class MqttCodecSpec extends AnyWordSpec with Matchers with LogCapturing { Connect.Mqtt, Connect.v311, "some-client-id", - ConnectFlags.CleanSession | ConnectFlags.WillFlag | ConnectFlags.WillQoS | ConnectFlags + ConnectFlags.CleanSession | ConnectFlags.WillFlag | ConnectFlags.WillQoS | + ConnectFlags .WillRetain | ConnectFlags.PasswordFlag | ConnectFlags.UsernameFlag, 1.second, Some("some-will-topic"), diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala index 385ab54e3..87e5be81c 100644 --- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala +++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala @@ -19,10 +19,7 @@ import pekko.actor.ActorSystem import pekko.pattern.ask import pekko.stream.connectors.mqtt.streaming._ import pekko.stream.connectors.mqtt.streaming.scaladsl.{ - ActorMqttClientSession, - ActorMqttServerSession, - Mqtt, - MqttServerSession + ActorMqttClientSession, ActorMqttServerSession, Mqtt, MqttServerSession } import pekko.stream.connectors.testkit.scaladsl.LogCapturing import pekko.stream.scaladsl.{ BroadcastHub, Flow, Keep, Sink, Source, SourceQueueWithComplete } @@ -520,7 +517,8 @@ class MqttSessionSpec client.watchCompletion().foreach(_ => session.shutdown()) } - "receive a QoS 1 publication from a subscribed topic and ack it and then ack it again - the stream should ignore" in assertAllStagesStopped { + "receive a QoS 1 publication from a subscribed topic and ack it and then ack it again - the stream should ignore" in + assertAllStagesStopped { // longer patience needed since Akka 2.6 implicit val patienceConfig: PatienceConfig = PatienceConfig(scaled(1.second), scaled(50.millis)) @@ -673,7 +671,8 @@ class MqttSessionSpec client.watchCompletion().foreach(_ => session.shutdown()) } - "receive a QoS 1 publication with DUP indicated from a unsubscribed topic - simulates a reconnect" in assertAllStagesStopped { + "receive a QoS 1 publication with DUP indicated from a unsubscribed topic - simulates a reconnect" in + assertAllStagesStopped { val session = ActorMqttClientSession(settings) val server = TestProbe() diff --git a/mqtt/src/main/scala/org/apache/pekko/stream/connectors/mqtt/impl/MqttFlowStage.scala b/mqtt/src/main/scala/org/apache/pekko/stream/connectors/mqtt/impl/MqttFlowStage.scala index 9abd66c94..dc7c787f8 100644 --- a/mqtt/src/main/scala/org/apache/pekko/stream/connectors/mqtt/impl/MqttFlowStage.scala +++ b/mqtt/src/main/scala/org/apache/pekko/stream/connectors/mqtt/impl/MqttFlowStage.scala @@ -27,16 +27,8 @@ import pekko.stream.stage._ import pekko.util.ByteString import org.eclipse.paho.client.mqttv3.{ - DisconnectedBufferOptions, - IMqttActionListener, - IMqttAsyncClient, - IMqttDeliveryToken, - IMqttToken, - MqttAsyncClient, - MqttCallbackExtended, - MqttConnectOptions, - MqttException, - MqttMessage => PahoMqttMessage + DisconnectedBufferOptions, IMqttActionListener, IMqttAsyncClient, IMqttDeliveryToken, IMqttToken, MqttAsyncClient, + MqttCallbackExtended, MqttConnectOptions, MqttException, MqttMessage => PahoMqttMessage } import scala.collection.mutable diff --git a/orientdb/src/test/scala/docs/scaladsl/OrientDbSpec.scala b/orientdb/src/test/scala/docs/scaladsl/OrientDbSpec.scala index f4dff41f9..753541ccc 100644 --- a/orientdb/src/test/scala/docs/scaladsl/OrientDbSpec.scala +++ b/orientdb/src/test/scala/docs/scaladsl/OrientDbSpec.scala @@ -18,10 +18,7 @@ import pekko.Done import pekko.actor.ActorSystem import pekko.stream.connectors.orientdb.scaladsl._ import pekko.stream.connectors.orientdb.{ - OrientDbReadResult, - OrientDbSourceSettings, - OrientDbWriteMessage, - OrientDbWriteSettings + OrientDbReadResult, OrientDbSourceSettings, OrientDbWriteMessage, OrientDbWriteSettings } import pekko.stream.connectors.testkit.scaladsl.LogCapturing import pekko.stream.scaladsl.{ Sink, Source } diff --git a/pravega/src/main/scala/org/apache/pekko/stream/connectors/pravega/impl/PravegaTableSource.scala b/pravega/src/main/scala/org/apache/pekko/stream/connectors/pravega/impl/PravegaTableSource.scala index e5d6ed852..ac51e4c46 100644 --- a/pravega/src/main/scala/org/apache/pekko/stream/connectors/pravega/impl/PravegaTableSource.scala +++ b/pravega/src/main/scala/org/apache/pekko/stream/connectors/pravega/impl/PravegaTableSource.scala @@ -28,10 +28,7 @@ import pekko.stream.ActorAttributes import pekko.stream.connectors.pravega.TableReaderSettings import io.pravega.client.KeyValueTableFactory import io.pravega.client.tables.{ - IteratorItem, - KeyValueTable, - KeyValueTableClientConfiguration, - TableEntry => JTableEntry + IteratorItem, KeyValueTable, KeyValueTableClientConfiguration, TableEntry => JTableEntry } import pekko.stream.connectors.pravega.TableEntry diff --git a/pravega/src/test/scala/docs/scaladsl/PravegaReadWriteDocs.scala b/pravega/src/test/scala/docs/scaladsl/PravegaReadWriteDocs.scala index 6be579e03..e21626d92 100644 --- a/pravega/src/test/scala/docs/scaladsl/PravegaReadWriteDocs.scala +++ b/pravega/src/test/scala/docs/scaladsl/PravegaReadWriteDocs.scala @@ -16,11 +16,7 @@ package docs.scaladsl import org.apache.pekko import pekko.actor.ActorSystem import pekko.stream.connectors.pravega.{ - PravegaEvent, - ReaderSettingsBuilder, - TableReaderSettingsBuilder, - TableWriterSettings, - TableWriterSettingsBuilder, + PravegaEvent, ReaderSettingsBuilder, TableReaderSettingsBuilder, TableWriterSettings, TableWriterSettingsBuilder, WriterSettingsBuilder } import pekko.stream.scaladsl.{ Sink, Source } diff --git a/pravega/src/test/scala/docs/scaladsl/PravegaSettingsSpec.scala b/pravega/src/test/scala/docs/scaladsl/PravegaSettingsSpec.scala index b109878c9..8c7746faf 100644 --- a/pravega/src/test/scala/docs/scaladsl/PravegaSettingsSpec.scala +++ b/pravega/src/test/scala/docs/scaladsl/PravegaSettingsSpec.scala @@ -15,11 +15,7 @@ package docs.scaladsl import java.net.URI import org.apache.pekko.stream.connectors.pravega.{ - PravegaBaseSpec, - ReaderSettingsBuilder, - TableReaderSettingsBuilder, - TableWriterSettingsBuilder, - WriterSettingsBuilder + PravegaBaseSpec, ReaderSettingsBuilder, TableReaderSettingsBuilder, TableWriterSettingsBuilder, WriterSettingsBuilder } import io.pravega.client.stream.Serializer import io.pravega.client.stream.impl.UTF8StringSerializer diff --git a/project/Common.scala b/project/Common.scala index 1355f3b82..9a6c9d2ae 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -37,7 +37,8 @@ object Common extends AutoPlugin { "Contributors", "[email protected]", url("https://github.com/apache/pekko-connectors/graphs/contributors")), - description := "Apache Pekko Connectors is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Pekko.", + description := + "Apache Pekko Connectors is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Pekko.", fatalWarnings := true, mimaReportSignatureProblems := true, // Ignore unused keys which affect documentation @@ -104,11 +105,12 @@ object Common extends AutoPlugin { "-Xlint:try", "-Xlint:unchecked", "-Xlint:varargs"), - compile / javacOptions ++= (scalaVersion.value match { - case Dependencies.Scala213 if insideCI.value && fatalWarnings.value && !Dependencies.CronBuild => - Seq("-Werror") - case _ => Seq.empty[String] - }), + compile / javacOptions ++= + (scalaVersion.value match { + case Dependencies.Scala213 if insideCI.value && fatalWarnings.value && !Dependencies.CronBuild => + Seq("-Werror") + case _ => Seq.empty[String] + }), autoAPIMappings := true, apiURL := { val apiVersion = if (isSnapshot.value) "current" else version.value diff --git a/reference/src/main/scala/org/apache/pekko/stream/connectors/reference/Resource.scala b/reference/src/main/scala/org/apache/pekko/stream/connectors/reference/Resource.scala index 461fbe92f..32f5ba45b 100644 --- a/reference/src/main/scala/org/apache/pekko/stream/connectors/reference/Resource.scala +++ b/reference/src/main/scala/org/apache/pekko/stream/connectors/reference/Resource.scala @@ -15,12 +15,7 @@ package org.apache.pekko.stream.connectors.reference import org.apache.pekko import pekko.actor.{ - ActorSystem, - ClassicActorSystemProvider, - ExtendedActorSystem, - Extension, - ExtensionId, - ExtensionIdProvider + ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import pekko.stream.scaladsl.Flow import pekko.util.ByteString diff --git a/reference/src/main/scala/org/apache/pekko/stream/connectors/reference/javadsl/Reference.scala b/reference/src/main/scala/org/apache/pekko/stream/connectors/reference/javadsl/Reference.scala index 8a430b6db..a6475bb50 100644 --- a/reference/src/main/scala/org/apache/pekko/stream/connectors/reference/javadsl/Reference.scala +++ b/reference/src/main/scala/org/apache/pekko/stream/connectors/reference/javadsl/Reference.scala @@ -19,10 +19,7 @@ import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.stream.connectors.reference.scaladsl import pekko.stream.connectors.reference.{ - ReferenceReadResult, - ReferenceWriteMessage, - ReferenceWriteResult, - SourceSettings + ReferenceReadResult, ReferenceWriteMessage, ReferenceWriteResult, SourceSettings } import pekko.stream.javadsl.{ Flow, Source } diff --git a/reference/src/main/scala/org/apache/pekko/stream/connectors/reference/scaladsl/Reference.scala b/reference/src/main/scala/org/apache/pekko/stream/connectors/reference/scaladsl/Reference.scala index 646b84444..e9521a451 100644 --- a/reference/src/main/scala/org/apache/pekko/stream/connectors/reference/scaladsl/Reference.scala +++ b/reference/src/main/scala/org/apache/pekko/stream/connectors/reference/scaladsl/Reference.scala @@ -18,9 +18,7 @@ import pekko.actor.ActorSystem import pekko.stream.Attributes import pekko.{ Done, NotUsed } import pekko.stream.connectors.reference.impl.{ - ReferenceFlowStage, - ReferenceSourceStage, - ReferenceWithResourceFlowStage + ReferenceFlowStage, ReferenceSourceStage, ReferenceWithResourceFlowStage } import pekko.stream.connectors.reference._ import pekko.stream.scaladsl.{ Flow, Source } diff --git a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala index e3d489738..e9622ca20 100644 --- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala +++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala @@ -25,12 +25,7 @@ import pekko.http.scaladsl.model.headers.{ `Raw-Request-URI`, Host, RawHeader } import pekko.http.scaladsl.model.{ RequestEntity, _ } import pekko.stream.connectors.s3.AccessStyle.{ PathAccessStyle, VirtualHostAccessStyle } import pekko.stream.connectors.s3.{ - ApiVersion, - BucketVersioning, - BucketVersioningStatus, - MFAStatus, - MultipartUpload, - S3Settings + ApiVersion, BucketVersioning, BucketVersioningStatus, MFAStatus, MultipartUpload, S3Settings } import pekko.stream.scaladsl.Source import pekko.util.ByteString diff --git a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/Marshalling.scala b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/Marshalling.scala index 2581f60d9..77425c057 100644 --- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/Marshalling.scala +++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/Marshalling.scala @@ -189,10 +189,10 @@ import scala.xml.NodeSeq val bucket = (x \ "Bucket").text val key = (x \ "Key").text val uploadId = (x \ "UploadId").text - val partNumberMarker = - (x \ "PartNumberMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)).map(_.toInt) - val nextPartNumberMarker = - (x \ "NextPartNumberMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)).map(_.toInt) + val partNumberMarker = (x \ "PartNumberMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)).map( + _.toInt) + val nextPartNumberMarker = (x \ "NextPartNumberMarker").headOption.flatMap(x => + Utils.emptyStringToOption(x.text)).map(_.toInt) val maxParts = (x \ "MaxParts").text.toInt val truncated = (x \ isTruncated).text == "true" @@ -241,18 +241,12 @@ import scala.xml.NodeSeq case x => val bucket = (x \ "Bucket").text val name = (x \ "Name").text - val prefix = - (x \ "Prefix").headOption.flatMap(x => Utils.emptyStringToOption(x.text)) - val keyMarker = - (x \ "KeyMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)) - val nextKeyMarker = - (x \ "NextKeyMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)) - val versionIdMarker = - (x \ "VersionIdMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)) - val nextVersionIdMarker = - (x \ "NextVersionIdMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)) - val delimiter = - (x \ "Delimiter").headOption.flatMap(x => Utils.emptyStringToOption(x.text)) + val prefix = (x \ "Prefix").headOption.flatMap(x => Utils.emptyStringToOption(x.text)) + val keyMarker = (x \ "KeyMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)) + val nextKeyMarker = (x \ "NextKeyMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)) + val versionIdMarker = (x \ "VersionIdMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)) + val nextVersionIdMarker = (x \ "NextVersionIdMarker").headOption.flatMap(x => Utils.emptyStringToOption(x.text)) + val delimiter = (x \ "Delimiter").headOption.flatMap(x => Utils.emptyStringToOption(x.text)) val maxKeys = (x \ "MaxKeys").text.toInt val truncated = (x \ isTruncated).text == "true" diff --git a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequest.scala b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequest.scala index f8270026f..ad7212e78 100644 --- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequest.scala +++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/auth/CanonicalRequest.scala @@ -83,7 +83,8 @@ import pekko.http.scaladsl.model.{ HttpHeader, HttpRequest } .groupBy(_.lowercaseName) .map { case (name, headers) => - name -> headers + name -> + headers .map(header => header.value.replaceAll("\\s+", " ").trim) .mkString(",") } diff --git a/s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala b/s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala index de669ee75..897b294a5 100644 --- a/s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala +++ b/s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala @@ -22,10 +22,7 @@ import pekko.stream.connectors.s3._ import pekko.stream.scaladsl.{ Sink, Source } import pekko.util.ByteString import com.github.tomakehurst.wiremock.client.WireMock.{ - headRequestedFor, - postRequestedFor, - putRequestedFor, - urlEqualTo + headRequestedFor, postRequestedFor, putRequestedFor, urlEqualTo } import com.github.tomakehurst.wiremock.http.Fault import com.github.tomakehurst.wiremock.matching.EqualToPattern diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3ExceptionSpec.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3ExceptionSpec.scala index 20b6a38db..914dba377 100644 --- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3ExceptionSpec.scala +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3ExceptionSpec.scala @@ -21,7 +21,8 @@ class S3ExceptionSpec extends AnyFlatSpecLike with Matchers { "S3 exception" should "be parsed" in { val e = S3Exception("Hej", StatusCodes.OK) - e.toString shouldBe "org.apache.pekko.stream.connectors.s3.S3Exception: Hej (Status code: 200 OK, Code: 200 OK, RequestId: -, Resource: -)" + e.toString shouldBe + "org.apache.pekko.stream.connectors.s3.S3Exception: Hej (Status code: 200 OK, Code: 200 OK, RequestId: -, Resource: -)" } it should "parse AWS sample" in { @@ -37,12 +38,14 @@ class S3ExceptionSpec extends AnyFlatSpecLike with Matchers { e.message shouldBe "The resource you requested does not exist" e.requestId shouldBe "4442587FB7D0A2F9" e.resource shouldBe "/mybucket/myfoto.jpg" - e.toString shouldBe "org.apache.pekko.stream.connectors.s3.S3Exception: The resource you requested does not exist (Status code: 404 Not Found, Code: NoSuchKey, RequestId: 4442587FB7D0A2F9, Resource: /mybucket/myfoto.jpg)" + e.toString shouldBe + "org.apache.pekko.stream.connectors.s3.S3Exception: The resource you requested does not exist (Status code: 404 Not Found, Code: NoSuchKey, RequestId: 4442587FB7D0A2F9, Resource: /mybucket/myfoto.jpg)" } it should "survive null" in { val e = S3Exception(null, StatusCodes.NotFound) - e.toString shouldBe "org.apache.pekko.stream.connectors.s3.S3Exception (Status code: 404 Not Found, Code: 404 Not Found, RequestId: -, Resource: -)" + e.toString shouldBe + "org.apache.pekko.stream.connectors.s3.S3Exception (Status code: 404 Not Found, Code: 404 Not Found, RequestId: -, Resource: -)" } } diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3SettingsSpec.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3SettingsSpec.scala index 9dd1bd6bb..935fc0cbe 100644 --- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3SettingsSpec.scala +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3SettingsSpec.scala @@ -242,12 +242,14 @@ class S3SettingsSpec extends S3WireMockBase with S3ClientIntegrationSpec with Op settings.listBucketApiVersion shouldEqual ApiVersion.ListBucketVersion1 } - it should "instantiate with the list bucket api version 2 if list-bucket-api-version is set to a number that is neither 1 or 2" in { + it should + "instantiate with the list bucket api version 2 if list-bucket-api-version is set to a number that is neither 1 or 2" in { val settings: S3Settings = mkSettings("list-bucket-api-version = 0") settings.listBucketApiVersion shouldEqual ApiVersion.ListBucketVersion2 } - it should "instantiate with the list bucket api version 2 if list-bucket-api-version is set to a value that is not a number" in { + it should + "instantiate with the list bucket api version 2 if list-bucket-api-version is set to a value that is not a number" in { val settings: S3Settings = mkSettings("list-bucket-api-version = 'version 1'") settings.listBucketApiVersion shouldEqual ApiVersion.ListBucketVersion2 } @@ -322,7 +324,8 @@ class S3SettingsSpec extends S3WireMockBase with S3ClientIntegrationSpec with Op | GetBucketVersioning = [GetBucketVersioning1, GetBucketVersioning2] |}""".stripMargin) - settings.allowedHeaders.keySet should contain allElementsOf (pekko.stream.connectors.s3.impl.S3Request.allRequests.map( + settings.allowedHeaders.keySet should contain allElementsOf + (pekko.stream.connectors.s3.impl.S3Request.allRequests.map( _.toString())) settings.allowedHeaders.foreach { case (key, value) => diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/MarshallingSpec.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/MarshallingSpec.scala index 302118f4c..dc5e37b0e 100644 --- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/MarshallingSpec.scala +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/MarshallingSpec.scala @@ -121,7 +121,8 @@ class MarshallingSpec(_system: ActorSystem) | <StorageClass>REDUCED_REDUNDANCY</StorageClass> | </Contents> |</ListBucketResult>""".stripMargin - it should "Use the value of the `NextContinuationToken` element as the continuation token of a truncated API V1 response" in { + it should + "Use the value of the `NextContinuationToken` element as the continuation token of a truncated API V1 response" in { val entity = HttpEntity(MediaTypes.`application/xml`.withCharset(HttpCharsets.`UTF-8`), listBucketV2TruncatedResponse) diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSizeSpec.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSizeSpec.scala index 2871e6537..4d2c83a38 100644 --- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSizeSpec.scala +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSizeSpec.scala @@ -50,7 +50,8 @@ class SplitAfterSizeSpec(_system: ActorSystem) .futureValue should be(Seq.empty) } - it should "start a new stream after the element that makes it reach a maximum, but not split the element itself" in assertAllStagesStopped { + it should "start a new stream after the element that makes it reach a maximum, but not split the element itself" in + assertAllStagesStopped { Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14))) .via( SplitAfterSize(10, MaxChunkSize)(Flow[ByteString]) diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/SplitAfterSizeWithContextSpec.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/SplitAfterSizeWithContextSpec.scala index 740062365..c22ea6da4 100644 --- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/SplitAfterSizeWithContextSpec.scala +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/auth/SplitAfterSizeWithContextSpec.scala @@ -51,7 +51,8 @@ class SplitAfterSizeWithContextSpec(_system: ActorSystem) .futureValue should be(Seq.empty) } - it should "start a new stream after the element that makes it reach a maximum, but not split the element itself" in assertAllStagesStopped { + it should "start a new stream after the element that makes it reach a maximum, but not split the element itself" in + assertAllStagesStopped { Source(Vector((ByteString(1, 2, 3, 4, 5), 1), (ByteString(6, 7, 8, 9, 10, 11, 12), 2), (ByteString(13, 14), 3))) .via( SplitAfterSizeWithContext(10)(Flow[(ByteString, Int)]) diff --git a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala index 3cfb32843..14eaa6752 100644 --- a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala +++ b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala @@ -477,7 +477,8 @@ trait S3IntegrationSpec } // we want ASCII and other UTF-8 characters! - it should "upload, download and delete with special characters in the key in non us-east-1 zone" in withBucketWithDots { + it should "upload, download and delete with special characters in the key in non us-east-1 zone" in + withBucketWithDots { bucketWithDots => uploadDownloadAndDeleteInOtherRegionCase(bucketWithDots, "føldęrü/1234()[]><!? .TXT") @@ -542,7 +543,8 @@ trait S3IntegrationSpec } } - it should "create multiple versions of an object and successfully clean it with deleteBucketContents" in withBucketWithVersioning { + it should "create multiple versions of an object and successfully clean it with deleteBucketContents" in + withBucketWithVersioning { bucketWithVersioning => // TODO: Figure out a way to properly test this with Minio, see https://github.com/akka/alpakka/issues/2750 assume(this.isInstanceOf[AWSS3IntegrationSpec]) diff --git a/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/javadsl/SqsPublishFlow.scala b/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/javadsl/SqsPublishFlow.scala index 902f1dbf9..c5b32b12e 100644 --- a/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/javadsl/SqsPublishFlow.scala +++ b/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/javadsl/SqsPublishFlow.scala @@ -17,11 +17,7 @@ import org.apache.pekko import pekko.NotUsed import pekko.annotation.ApiMayChange import pekko.stream.connectors.sqs.{ - SqsPublishBatchSettings, - SqsPublishGroupedSettings, - SqsPublishResult, - SqsPublishResultEntry, - SqsPublishSettings + SqsPublishBatchSettings, SqsPublishGroupedSettings, SqsPublishResult, SqsPublishResultEntry, SqsPublishSettings } import pekko.stream.javadsl.Flow import pekko.stream.scaladsl.{ Flow => SFlow } diff --git a/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsAckFlow.scala b/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsAckFlow.scala index 426bb519b..8f4baea98 100644 --- a/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsAckFlow.scala +++ b/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsAckFlow.scala @@ -125,7 +125,8 @@ object SqsAckFlow { .build() } - actions -> DeleteMessageBatchRequest + actions -> + DeleteMessageBatchRequest .builder() .queueUrl(queueUrl) .entries(entries.asJava) @@ -178,7 +179,8 @@ object SqsAckFlow { .build() } - actions -> ChangeMessageVisibilityBatchRequest + actions -> + ChangeMessageVisibilityBatchRequest .builder() .queueUrl(queueUrl) .entries(entries.asJava) diff --git a/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsPublishFlow.scala b/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsPublishFlow.scala index 8875eedbb..4b34d07f1 100644 --- a/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsPublishFlow.scala +++ b/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsPublishFlow.scala @@ -98,7 +98,8 @@ object SqsPublishFlow { .build() } - requests -> SendMessageBatchRequest + requests -> + SendMessageBatchRequest .builder() .queueUrl(queueUrl) .entries(entries.toList.asJava) diff --git a/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/testkit/MessageFactory.scala b/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/testkit/MessageFactory.scala index 6566bdfa8..2599ec223 100644 --- a/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/testkit/MessageFactory.scala +++ b/sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/testkit/MessageFactory.scala @@ -16,9 +16,7 @@ package org.apache.pekko.stream.connectors.sqs.testkit import org.apache.pekko import pekko.stream.connectors.sqs.SqsAckResult.{ SqsChangeMessageVisibilityResult, SqsDeleteResult, SqsIgnoreResult } import pekko.stream.connectors.sqs.SqsAckResultEntry.{ - SqsChangeMessageVisibilityResultEntry, - SqsDeleteResultEntry, - SqsIgnoreResultEntry + SqsChangeMessageVisibilityResultEntry, SqsDeleteResultEntry, SqsIgnoreResultEntry } import pekko.stream.connectors.sqs.{ MessageAction, SqsPublishResult, SqsPublishResultEntry, SqsResult } import software.amazon.awssdk.services.sqs.model._ diff --git a/sqs/src/test/scala/docs/scaladsl/SqsPublishSpec.scala b/sqs/src/test/scala/docs/scaladsl/SqsPublishSpec.scala index 2e2da913e..21d096772 100644 --- a/sqs/src/test/scala/docs/scaladsl/SqsPublishSpec.scala +++ b/sqs/src/test/scala/docs/scaladsl/SqsPublishSpec.scala @@ -260,7 +260,8 @@ class SqsPublishSpec extends AnyFlatSpec with Matchers with DefaultTestContext w } } - ignore should "put message in a flow, then pass the result further with fifo queues" taggedAs Integration in new IntegrationFixture( + ignore should "put message in a flow, then pass the result further with fifo queues" taggedAs Integration in + new IntegrationFixture( fifo = true) { // elasticmq does not provide proper fifo support (see https://github.com/adamw/elasticmq/issues/92) // set your fifo sqs queue url and awsSqsClient manually @@ -286,7 +287,8 @@ class SqsPublishSpec extends AnyFlatSpec with Matchers with DefaultTestContext w receiveMessage().body() shouldBe "connectors" } - ignore should "put message in a flow, batch, then pass the result further with fifo queues" taggedAs Integration in new IntegrationFixture( + ignore should "put message in a flow, batch, then pass the result further with fifo queues" taggedAs Integration in + new IntegrationFixture( fifo = true) { // elasticmq does not provide proper fifo support (see https://github.com/adamw/elasticmq/issues/92) // set your fifo sqs queue url and awsSqsClient manually diff --git a/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala b/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala index 6673b3312..c71d05dbe 100644 --- a/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala +++ b/sqs/src/test/scala/docs/scaladsl/SqsSourceSpec.scala @@ -32,11 +32,7 @@ import software.amazon.awssdk.http.async.SdkAsyncHttpClient import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.{ - Message, - MessageAttributeValue, - MessageSystemAttributeName, - QueueDoesNotExistException, - SendMessageRequest + Message, MessageAttributeValue, MessageSystemAttributeName, QueueDoesNotExistException, SendMessageRequest } import scala.collection.immutable @@ -120,7 +116,8 @@ class SqsSourceSpec extends AnyFlatSpec with ScalaFutures with Matchers with Def val future = SqsSource(queueUrl, settings).runWith(Sink.head) private val message: Message = future.futureValue - message.attributes().keySet.asScala should contain theSameElementsAs allAvailableAttributes + message.attributes().keySet.asScala should contain theSameElementsAs + allAvailableAttributes .map(attr => MessageSystemAttributeName.fromValue(attr.name)) } } @@ -164,7 +161,8 @@ class SqsSourceSpec extends AnyFlatSpec with ScalaFutures with Matchers with Def val future = SqsSource(queueUrl, settings).runWith(Sink.head) private val message: Message = future.futureValue - message.attributes().keySet.asScala should contain theSameElementsAs attributes + message.attributes().keySet.asScala should contain theSameElementsAs + attributes .map(_.name) .map(MessageSystemAttributeName.fromValue) } diff --git a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala index b5cbf5bfe..212ffb897 100644 --- a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala +++ b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala @@ -20,9 +20,7 @@ import pekko.annotation.InternalApi import pekko.event.{ Logging, LoggingAdapter } import pekko.stream._ import pekko.stream.connectors.unixdomainsocket.scaladsl.UnixDomainSocket.{ - IncomingConnection, - OutgoingConnection, - ServerBinding + IncomingConnection, OutgoingConnection, ServerBinding } import pekko.stream.scaladsl.{ Flow, Keep, Sink, Source, SourceQueueWithComplete } import pekko.util.ByteString --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
