This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 99312dcf9 Replace deprecated OrientDB APIs in orientdb connector 
(#1553)
99312dcf9 is described below

commit 99312dcf9ee9b132e7ed10af1858263d093ec18c
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Apr 7 09:39:12 2026 +0200

    Replace deprecated OrientDB APIs in orientdb connector (#1553)
    
    * Replace deprecated OrientDB APIs: OPartitionedDatabasePool, 
ODatabaseDocumentTx, OSQLSynchQuery
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/299c1de3-c73d-4239-bdd3-d7dbd1cd5184
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix remaining deprecation warnings: replace OServerAdmin with OrientDB, 
replace ODocument.save() with client.save()
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/dec73062-3e40-4181-a813-5aadacbc726c
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Add @OVersion fields to typed POJO test classes to fix MVCC 
OTransactionException
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/a525e280-9baf-4079-84d1-fbd47a4912f9
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Create switch-away-from-deprecated-classes.backwards.excludes
    
    * init the arraylist size
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 build.sbt                                          |  4 +-
 ...away-from-deprecated-classes.backwards.excludes | 26 ++++++++
 .../orientdb/OrientDbSourceSettings.scala          | 12 ++--
 .../orientdb/OrientDbWriteSettings.scala           | 14 ++---
 .../orientdb/impl/OrientDbFlowStage.scala          |  8 +--
 .../orientdb/impl/OrientDbSourceStage.scala        | 69 ++++++++++++++++------
 .../src/test/java/docs/javadsl/OrientDbTest.java   | 55 +++++++++--------
 .../test/scala/docs/scaladsl/OrientDbSpec.scala    | 43 +++++++-------
 8 files changed, 144 insertions(+), 87 deletions(-)

diff --git a/build.sbt b/build.sbt
index 58ef5ac7c..ef0b7de37 100644
--- a/build.sbt
+++ b/build.sbt
@@ -340,9 +340,7 @@ lazy val orientdb =
     "orientdb",
     "orientdb",
     Dependencies.OrientDB,
-    Test / fork := true,
-    // note: orientdb client needs to be refactored to move off deprecated 
calls
-    fatalWarnings := false)
+    Test / fork := true)
 
 lazy val reference = internalProject("reference", Dependencies.Reference)
   .dependsOn(testkit % Test)
diff --git 
a/orientdb/src/main/mima-filters/2.0.x.backward.excludes/switch-away-from-deprecated-classes.backwards.excludes
 
b/orientdb/src/main/mima-filters/2.0.x.backward.excludes/switch-away-from-deprecated-classes.backwards.excludes
new file mode 100644
index 000000000..21194c58b
--- /dev/null
+++ 
b/orientdb/src/main/mima-filters/2.0.x.backward.excludes/switch-away-from-deprecated-classes.backwards.excludes
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# switch away from deprecated orientdb classes
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbSourceSettings.create")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbSourceSettings.apply")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbSourceSettings.oDatabasePool")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbSourceSettings.withOrientDBCredentials")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbWriteSettings.create")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbWriteSettings.apply")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbWriteSettings.oDatabasePool")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbWriteSettings.withOrientDBCredentials")
diff --git 
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbSourceSettings.scala
 
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbSourceSettings.scala
index 4b0ef46d6..a2afda5bb 100644
--- 
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbSourceSettings.scala
+++ 
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbSourceSettings.scala
@@ -13,21 +13,21 @@
 
 package org.apache.pekko.stream.connectors.orientdb
 
-import com.orientechnologies.orient.core.db.OPartitionedDatabasePool
+import com.orientechnologies.orient.core.db.ODatabasePool
 
 final class OrientDbSourceSettings private (
-    val oDatabasePool: 
com.orientechnologies.orient.core.db.OPartitionedDatabasePool,
+    val oDatabasePool: com.orientechnologies.orient.core.db.ODatabasePool,
     val skip: Int,
     val limit: Int) {
 
   def withOrientDBCredentials(
-      value: com.orientechnologies.orient.core.db.OPartitionedDatabasePool): 
OrientDbSourceSettings =
+      value: com.orientechnologies.orient.core.db.ODatabasePool): 
OrientDbSourceSettings =
     copy(oDatabasePool = value)
   def withSkip(value: Int): OrientDbSourceSettings = copy(skip = value)
   def withLimit(value: Int): OrientDbSourceSettings = copy(limit = value)
 
   private def copy(
-      oDatabasePool: 
com.orientechnologies.orient.core.db.OPartitionedDatabasePool = oDatabasePool,
+      oDatabasePool: com.orientechnologies.orient.core.db.ODatabasePool = 
oDatabasePool,
       skip: Int = skip,
       limit: Int = limit): OrientDbSourceSettings = new OrientDbSourceSettings(
     oDatabasePool = oDatabasePool,
@@ -45,11 +45,11 @@ final class OrientDbSourceSettings private (
 object OrientDbSourceSettings {
 
   /** Scala API */
-  def apply(oDatabasePool: OPartitionedDatabasePool): OrientDbSourceSettings = 
new OrientDbSourceSettings(
+  def apply(oDatabasePool: ODatabasePool): OrientDbSourceSettings = new 
OrientDbSourceSettings(
     oDatabasePool,
     skip = 0,
     limit = 10)
 
   /** Java API */
-  def create(oDatabasePool: OPartitionedDatabasePool): OrientDbSourceSettings 
= apply(oDatabasePool)
+  def create(oDatabasePool: ODatabasePool): OrientDbSourceSettings = 
apply(oDatabasePool)
 }
diff --git 
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbWriteSettings.scala
 
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbWriteSettings.scala
index 49a9c70d6..3e27231e1 100644
--- 
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbWriteSettings.scala
+++ 
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbWriteSettings.scala
@@ -13,17 +13,17 @@
 
 package org.apache.pekko.stream.connectors.orientdb
 
-import com.orientechnologies.orient.core.db.OPartitionedDatabasePool
+import com.orientechnologies.orient.core.db.ODatabasePool
 
 final class OrientDbWriteSettings private (
-    val oDatabasePool: 
com.orientechnologies.orient.core.db.OPartitionedDatabasePool) {
+    val oDatabasePool: com.orientechnologies.orient.core.db.ODatabasePool) {
 
   def withOrientDBCredentials(
-      value: com.orientechnologies.orient.core.db.OPartitionedDatabasePool): 
OrientDbWriteSettings =
+      value: com.orientechnologies.orient.core.db.ODatabasePool): 
OrientDbWriteSettings =
     copy(oDatabasePool = value)
 
   private def copy(
-      oDatabasePool: 
com.orientechnologies.orient.core.db.OPartitionedDatabasePool): 
OrientDbWriteSettings =
+      oDatabasePool: com.orientechnologies.orient.core.db.ODatabasePool): 
OrientDbWriteSettings =
     new OrientDbWriteSettings(
       oDatabasePool = oDatabasePool)
 
@@ -36,10 +36,10 @@ final class OrientDbWriteSettings private (
 object OrientDbWriteSettings {
 
   /** Scala API */
-  def apply(oDatabasePool: OPartitionedDatabasePool): OrientDbWriteSettings =
+  def apply(oDatabasePool: ODatabasePool): OrientDbWriteSettings =
     new OrientDbWriteSettings(
-      oDatabasePool: OPartitionedDatabasePool)
+      oDatabasePool: ODatabasePool)
 
   /** Java API */
-  def create(oDatabasePool: OPartitionedDatabasePool): OrientDbWriteSettings = 
apply(oDatabasePool)
+  def create(oDatabasePool: ODatabasePool): OrientDbWriteSettings = 
apply(oDatabasePool)
 }
diff --git 
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbFlowStage.scala
 
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbFlowStage.scala
index 11a2ad82d..6230f42f4 100644
--- 
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbFlowStage.scala
+++ 
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbFlowStage.scala
@@ -19,7 +19,8 @@ import pekko.stream._
 import pekko.stream.connectors.orientdb.{ OrientDbWriteMessage, 
OrientDbWriteSettings }
 import pekko.stream.stage._
 import com.orientechnologies.orient.`object`.db.OObjectDatabaseTx
-import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx
+import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal
+import com.orientechnologies.orient.core.db.ODatabaseSession
 import com.orientechnologies.orient.core.record.ORecord
 import com.orientechnologies.orient.core.record.impl.ODocument
 import com.orientechnologies.orient.core.tx.OTransaction
@@ -54,13 +55,12 @@ private[orientdb] class OrientDbFlowStage[T, C](
 
   sealed abstract class OrientDbLogic extends GraphStageLogic(shape) with 
InHandler with OutHandler {
 
-    protected var client: ODatabaseDocumentTx = _
+    protected var client: ODatabaseSession = _
     protected var oObjectClient: OObjectDatabaseTx = _
 
     override def preStart(): Unit = {
       client = settings.oDatabasePool.acquire()
-      oObjectClient = new OObjectDatabaseTx(client)
-      client.setDatabaseOwner(oObjectClient)
+      oObjectClient = new 
OObjectDatabaseTx(client.asInstanceOf[ODatabaseDocumentInternal])
     }
 
     override def postStop(): Unit = {
diff --git 
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbSourceStage.scala
 
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbSourceStage.scala
index 4c83e694b..0b91a2655 100644
--- 
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbSourceStage.scala
+++ 
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbSourceStage.scala
@@ -21,10 +21,11 @@ import pekko.stream.connectors.orientdb.{ 
OrientDbReadResult, OrientDbSourceSett
 import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
 import pekko.stream.{ ActorAttributes, Attributes, Outlet, SourceShape }
 import com.orientechnologies.orient.`object`.db.OObjectDatabaseTx
-import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx
-import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery
+import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal
+import com.orientechnologies.orient.core.db.ODatabaseSession
 
 import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
 
 /**
  * INTERNAL API
@@ -48,15 +49,25 @@ private[orientdb] final class 
OrientDbSourceStage[T](className: String,
         query match {
           case Some(q) =>
             new Logic {
-              override protected def runQuery(): util.List[T] =
-                client.query[util.List[T]](new OSQLSynchQuery[T](q))
-
+              override protected def runQuery(): util.List[T] = {
+                val rs = client.query(q)
+                val results = newArrayListWithSize(rs.estimateSize())
+                try {
+                  while (rs.hasNext) 
results.add(rs.next().toElement.asInstanceOf[T])
+                } finally rs.close()
+                results
+              }
             }
           case None =>
             new Logic {
-              override protected def runQuery(): util.List[T] =
-                client.query[util.List[T]](
-                  new OSQLSynchQuery[T](s"SELECT * FROM $className SKIP 
${skip} LIMIT ${settings.limit}"))
+              override protected def runQuery(): util.List[T] = {
+                val rs = client.query(s"SELECT * FROM $className SKIP ${skip} 
LIMIT ${settings.limit}")
+                val results = newArrayListWithSize(rs.estimateSize())
+                try {
+                  while (rs.hasNext) 
results.add(rs.next().toElement.asInstanceOf[T])
+                } finally rs.close()
+                results
+              }
             }
         }
 
@@ -70,9 +81,16 @@ private[orientdb] final class 
OrientDbSourceStage[T](className: String,
               }
 
               override protected def runQuery(): util.List[T] = {
-                client.setDatabaseOwner(oObjectClient)
-                oObjectClient.getEntityManager.registerEntityClass(c)
-                oObjectClient.query[util.List[T]](new OSQLSynchQuery[T](q))
+                val rs = oObjectClient.query(q)
+                val results = newArrayListWithSize(rs.estimateSize())
+                try {
+                  while (rs.hasNext) {
+                    rs.next().getRecord().toScala.foreach { record =>
+                      results.add(oObjectClient.getUserObjectByRecord(record, 
null).asInstanceOf[T])
+                    }
+                  }
+                } finally rs.close()
+                results
               }
             }
           case None =>
@@ -82,26 +100,39 @@ private[orientdb] final class 
OrientDbSourceStage[T](className: String,
                 oObjectClient.getEntityManager.registerEntityClass(c)
               }
 
-              override protected def runQuery(): util.List[T] =
-                oObjectClient
-                  .query[util.List[T]](
-                    new OSQLSynchQuery[T](
-                      s"SELECT * FROM $className SKIP ${skip} LIMIT 
${settings.limit}"))
+              override protected def runQuery(): util.List[T] = {
+                val rs =
+                  oObjectClient.query(s"SELECT * FROM $className SKIP ${skip} 
LIMIT ${settings.limit}")
+                val results = newArrayListWithSize(rs.estimateSize())
+                try {
+                  while (rs.hasNext) {
+                    rs.next().getRecord().toScala.foreach { record =>
+                      results.add(oObjectClient.getUserObjectByRecord(record, 
null).asInstanceOf[T])
+                    }
+                  }
+                } finally rs.close()
+                results
+              }
             }
         }
 
     }
 
+  // if the size is larger than Int.MaxValue, we will just create an ArrayList 
with a default
+  // size of 1000 and let it grow as needed - we assume that estimateSize() is 
just a hint
+  private def newArrayListWithSize(size: Long): util.ArrayList[T] =
+    if (size > Int.MaxValue) new util.ArrayList[T](1000)
+    else new util.ArrayList[T](math.max(size.toInt, 0))
+
   private abstract class Logic extends GraphStageLogic(shape) with OutHandler {
 
-    protected var client: ODatabaseDocumentTx = _
+    protected var client: ODatabaseSession = _
     protected var oObjectClient: OObjectDatabaseTx = _
     protected var skip = settings.skip
 
     override def preStart(): Unit = {
       client = settings.oDatabasePool.acquire()
-      oObjectClient = new OObjectDatabaseTx(client)
-      client.setDatabaseOwner(oObjectClient)
+      oObjectClient = new 
OObjectDatabaseTx(client.asInstanceOf[ODatabaseDocumentInternal])
     }
 
     override def postStop(): Unit =
diff --git a/orientdb/src/test/java/docs/javadsl/OrientDbTest.java 
b/orientdb/src/test/java/docs/javadsl/OrientDbTest.java
index d602d85db..ad516acb5 100644
--- a/orientdb/src/test/java/docs/javadsl/OrientDbTest.java
+++ b/orientdb/src/test/java/docs/javadsl/OrientDbTest.java
@@ -26,12 +26,16 @@ import 
org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
 import org.apache.pekko.stream.javadsl.Sink;
 import org.apache.pekko.stream.javadsl.Source;
 import org.apache.pekko.testkit.javadsl.TestKit;
-import com.orientechnologies.orient.client.remote.OServerAdmin;
+import com.orientechnologies.orient.core.annotation.OVersion;
+import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal;
 import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
+import com.orientechnologies.orient.core.db.ODatabaseType;
+import com.orientechnologies.orient.core.db.OrientDB;
+import com.orientechnologies.orient.core.db.OrientDBConfig;
 // #init-settings
-import com.orientechnologies.orient.core.db.OPartitionedDatabasePool;
+import com.orientechnologies.orient.core.db.ODatabasePool;
 // #init-settings
-import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
+import com.orientechnologies.orient.core.db.ODatabaseSession;
 import com.orientechnologies.orient.core.record.impl.ODocument;
 import com.orientechnologies.orient.object.db.OObjectDatabaseTx;
 import org.junit.AfterClass;
@@ -54,16 +58,15 @@ import static org.junit.Assert.assertEquals;
 public class OrientDbTest {
   @Rule public final LogCapturingJunit4 logCapturing = new 
LogCapturingJunit4();
 
-  private static OServerAdmin oServerAdmin;
-  private static OPartitionedDatabasePool oDatabase;
-  private static ODatabaseDocumentTx client;
+  private static OrientDB orientDB;
+  private static ODatabasePool oDatabase;
+  private static ODatabaseSession client;
   private static ActorSystem system;
 
   // #init-settings
 
   private static String url = "remote:127.0.0.1:2424/";
   private static String dbName = "GratefulDeadConcertsJava";
-  private static String dbUrl = url + dbName;
   private static String username = "root";
   private static String password = "root";
   // #init-settings
@@ -78,6 +81,7 @@ public class OrientDbTest {
   public static class source1 {
 
     private String book_title;
+    @OVersion private Integer version;
 
     public void setBook_title(String book_title) {
       this.book_title = book_title;
@@ -91,6 +95,7 @@ public class OrientDbTest {
   public static class sink2 {
 
     private String book_title;
+    @OVersion private Integer version;
 
     public void setBook_title(String book_title) {
       this.book_title = book_title;
@@ -151,16 +156,12 @@ public class OrientDbTest {
   public static void setup() throws Exception {
     system = ActorSystem.create();
 
-    oServerAdmin = new OServerAdmin(url).connect(username, password);
-    if (!oServerAdmin.existsDatabase(dbName, "plocal")) {
-      oServerAdmin.createDatabase(dbName, "document", "plocal");
-    }
+    orientDB = new OrientDB(url, username, password, 
OrientDBConfig.defaultConfig());
+    orientDB.createIfNotExists(dbName, ODatabaseType.PLOCAL);
 
     // #init-settings
 
-    oDatabase =
-        new OPartitionedDatabasePool(
-            dbUrl, username, password, 
Runtime.getRuntime().availableProcessors(), 10);
+    oDatabase = new ODatabasePool(orientDB, dbName, username, password);
 
     system.registerOnTermination(() -> oDatabase.close());
     // #init-settings
@@ -185,13 +186,12 @@ public class OrientDbTest {
     unregister(sink3);
     unregister(sink6);
 
-    if (oServerAdmin.existsDatabase(dbName, "plocal")) {
-      oServerAdmin.dropDatabase(dbName, "plocal");
-    }
-    oServerAdmin.close();
-
     client.close();
     oDatabase.close();
+    if (orientDB.exists(dbName)) {
+      orientDB.drop(dbName);
+    }
+    orientDB.close();
     TestKit.shutdownActorSystem(system);
   }
 
@@ -204,7 +204,7 @@ public class OrientDbTest {
   private static void flush(String className, String fieldName, String 
fieldValue) {
     ODocument oDocument = new ODocument().field(fieldName, fieldValue);
     oDocument.setClassNameIfExists(className);
-    oDocument.save();
+    client.save(oDocument);
   }
 
   private static void unregister(String className) {
@@ -274,8 +274,9 @@ public class OrientDbTest {
                 sourceClass, OrientDbSourceSettings.create(oDatabase), 
source1.class, null)
             .map(
                 readResult -> {
-                  ODatabaseDocumentTx db = oDatabase.acquire();
-                  db.setDatabaseOwner(new OObjectDatabaseTx(db));
+                  ODatabaseDocumentInternal db =
+                      (ODatabaseDocumentInternal) oDatabase.acquire();
+                  new OObjectDatabaseTx(db);
                   ODatabaseRecordThreadLocal.instance().set(db);
                   sink2 sink = new sink2();
                   sink.setBook_title(readResult.oDocument().getBook_title());
@@ -295,8 +296,9 @@ public class OrientDbTest {
                 sinkClass2, OrientDbSourceSettings.create(oDatabase), 
sink2.class, null)
             .map(
                 m -> {
-                  ODatabaseDocumentTx db = oDatabase.acquire();
-                  db.setDatabaseOwner(new OObjectDatabaseTx(db));
+                  ODatabaseDocumentInternal db =
+                      (ODatabaseDocumentInternal) oDatabase.acquire();
+                  new OObjectDatabaseTx(db);
                   ODatabaseRecordThreadLocal.instance().set(db);
                   return m.oDocument().getBook_title();
                 })
@@ -351,8 +353,9 @@ public class OrientDbTest {
         .via(OrientDbFlow.createWithPassThrough(sink6, 
OrientDbWriteSettings.create(oDatabase)))
         .map(
             messages -> {
-              ODatabaseDocumentTx db = oDatabase.acquire();
-              db.setDatabaseOwner(new OObjectDatabaseTx(db));
+              ODatabaseDocumentInternal db =
+                  (ODatabaseDocumentInternal) oDatabase.acquire();
+              new OObjectDatabaseTx(db);
               ODatabaseRecordThreadLocal.instance().set(db);
               messages.stream().forEach(message -> 
commitToKafka.accept(message.passThrough()));
               return NotUsed.getInstance();
diff --git a/orientdb/src/test/scala/docs/scaladsl/OrientDbSpec.scala 
b/orientdb/src/test/scala/docs/scaladsl/OrientDbSpec.scala
index f4dff41f9..c6953f620 100644
--- a/orientdb/src/test/scala/docs/scaladsl/OrientDbSpec.scala
+++ b/orientdb/src/test/scala/docs/scaladsl/OrientDbSpec.scala
@@ -28,10 +28,13 @@ import pekko.stream.scaladsl.{ Sink, Source }
 import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
 import pekko.testkit.TestKit
 import com.orientechnologies.orient.`object`.db.OObjectDatabaseTx
-import com.orientechnologies.orient.client.remote.OServerAdmin
-import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx
+import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal
+import com.orientechnologies.orient.core.db.ODatabaseSession
+import com.orientechnologies.orient.core.db.ODatabaseType
+import com.orientechnologies.orient.core.db.OrientDB
+import com.orientechnologies.orient.core.db.OrientDBConfig
 //#init-settings
-import com.orientechnologies.orient.core.db.OPartitionedDatabasePool
+import com.orientechnologies.orient.core.db.ODatabasePool
 //#init-settings
 import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal
 import com.orientechnologies.orient.core.record.impl.ODocument
@@ -55,7 +58,6 @@ class OrientDbSpec extends AnyWordSpec with Matchers with 
BeforeAndAfterAll with
 
   val url = "remote:127.0.0.1:2424/"
   val dbName = "GratefulDeadConcertsScala"
-  val dbUrl = s"$url$dbName"
   val username = "root"
   val password = "root"
   // #init-settings
@@ -70,20 +72,18 @@ class OrientDbSpec extends AnyWordSpec with Matchers with 
BeforeAndAfterAll with
   case class Book(title: String)
   // #define-class
 
-  var oServerAdmin: OServerAdmin = _
-  var oDatabase: OPartitionedDatabasePool = _
-  var client: ODatabaseDocumentTx = _
+  var orientDB: OrientDB = _
+  var oDatabase: ODatabasePool = _
+  var client: ODatabaseSession = _
 
   override def beforeAll() = {
-    oServerAdmin = new OServerAdmin(url).connect(username, password)
-    if (!oServerAdmin.existsDatabase(dbName, "plocal")) {
-      oServerAdmin.createDatabase(dbName, "document", "plocal")
-    }
+    orientDB = new OrientDB(url, username, password, 
OrientDBConfig.defaultConfig())
+    orientDB.createIfNotExists(dbName, ODatabaseType.PLOCAL)
 
     // #init-settings
 
-    val oDatabase: OPartitionedDatabasePool =
-      new OPartitionedDatabasePool(dbUrl, username, password, 
Runtime.getRuntime.availableProcessors(), 10)
+    val oDatabase: ODatabasePool =
+      new ODatabasePool(orientDB, dbName, username, password)
 
     system.registerOnTermination(() -> oDatabase.close())
     // #init-settings
@@ -107,13 +107,12 @@ class OrientDbSpec extends AnyWordSpec with Matchers with 
BeforeAndAfterAll with
     unregister(sink5)
     unregister(sink7)
 
-    if (oServerAdmin.existsDatabase(dbName, "plocal")) {
-      oServerAdmin.dropDatabase(dbName, "plocal")
-    }
-    oServerAdmin.close()
-
     client.close()
     oDatabase.close()
+    if (orientDB.exists(dbName)) {
+      orientDB.drop(dbName)
+    }
+    orientDB.close()
     TestKit.shutdownActorSystem(system)
   }
 
@@ -125,7 +124,7 @@ class OrientDbSpec extends AnyWordSpec with Matchers with 
BeforeAndAfterAll with
     val oDocument = new ODocument()
       .field(fieldName, fieldValue)
     oDocument.setClassNameIfExists(className)
-    oDocument.save()
+    client.save(oDocument)
   }
 
   private def unregister(className: String): Unit =
@@ -232,9 +231,9 @@ class OrientDbSpec extends AnyWordSpec with Matchers with 
BeforeAndAfterAll with
       val streamCompletion: Future[Done] = OrientDbSource
         .typed(sourceClass, OrientDbSourceSettings(oDatabase), 
classOf[OrientDbTest.source1])
         .map { (m: OrientDbReadResult[OrientDbTest.source1]) =>
-          val db: ODatabaseDocumentTx = oDatabase.acquire
-          db.setDatabaseOwner(new OObjectDatabaseTx(db))
-          ODatabaseRecordThreadLocal.instance.set(db)
+          val db = oDatabase.acquire
+          new OObjectDatabaseTx(db.asInstanceOf[ODatabaseDocumentInternal])
+          
ODatabaseRecordThreadLocal.instance.set(db.asInstanceOf[ODatabaseDocumentInternal])
           val sink: OrientDbTest.sink2 = new OrientDbTest.sink2
           sink.setBook_title(m.oDocument.getBook_title)
           OrientDbWriteMessage(sink)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to