This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch 1.1.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.1.x by this push:
     new c229413921 tidy up metrics exchange (#1899) (#1907)
c229413921 is described below

commit c2294139218314472c82fa8b2b3a2459f73b20f6
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Jun 17 18:33:22 2025 +0100

    tidy up metrics exchange (#1899) (#1907)
    
    * tidy up metrics exchange
    
    Update NumberInputStream.scala
    
    scalafmt
    
    Update NumberInputStream.scala
    
    partial test
    
    scalafmt
    
    Update NumberInputStreamSpec.scala
    
    Update NumberInputStreamSpec.scala
    
    Update NumberInputStreamSpec.scala
    
    * Update NumberInputStreamSpec.scala
    
    * use system.dynamicAccess.classLoader
    
    * scalafmt
    
    * Update NumberInputStream.scala
---
 .../metrics/protobuf/MessageSerializer.scala       |  3 +-
 .../metrics/protobuf/NumberInputStream.scala       | 56 ++++++++++++++
 .../metrics/protobuf/NumberInputStreamSpec.scala   | 85 ++++++++++++++++++++++
 3 files changed, 142 insertions(+), 2 deletions(-)

diff --git 
a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala
 
b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala
index 8dd24134c4..57109a90bf 100644
--- 
a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala
+++ 
b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala
@@ -27,7 +27,6 @@ import pekko.dispatch.Dispatchers
 import pekko.protobufv3.internal.MessageLite
 import pekko.remote.ByteStringUtils
 import pekko.serialization.{ BaseSerializer, SerializationExtension, 
SerializerWithStringManifest, Serializers }
-import pekko.util.ClassLoaderObjectInputStream
 import pekko.util.ccompat._
 import pekko.util.ccompat.JavaConverters._
 
@@ -263,7 +262,7 @@ class MessageSerializer(val system: ExtendedActorSystem) 
extends SerializerWithS
         case NumberType.Float_VALUE   => 
jl.Float.intBitsToFloat(number.getValue32)
         case NumberType.Integer_VALUE => number.getValue32
         case NumberType.Serialized_VALUE =>
-          val in = new ClassLoaderObjectInputStream(
+          val in = new NumberInputStream(
             system.dynamicAccess.classLoader,
             new ByteArrayInputStream(number.getSerialized.toByteArray))
           val obj = in.readObject
diff --git 
a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/NumberInputStream.scala
 
b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/NumberInputStream.scala
new file mode 100644
index 0000000000..04ca1c0cc9
--- /dev/null
+++ 
b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/NumberInputStream.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.cluster.metrics.protobuf
+
+import java.io.{ InputStream, ObjectInputStream, ObjectStreamClass }
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+
+/**
+ * A special ObjectInputStream that will only load built-in primitives or
+ * Number classes.
+ * <p>
+ * This is for internal Pekko use only, and is not intended for public use.
+ * </p>
+ */
+@InternalApi
+private[protobuf] class NumberInputStream(
+    classLoader: ClassLoader,
+    inputStream: InputStream) extends ObjectInputStream(inputStream) {
+
+  /**
+   * Resolve a class specified by the descriptor using the provided classloader
+   * and that treats any class that is not a primitive or a subclass of
+   * <code>java.lang.Number</code> as not found.
+   *
+   * @param objectStreamClass  descriptor of the class
+   * @return the Class object described by the ObjectStreamClass
+   * @throws ClassNotFoundException if the Class cannot be found (or is 
rejected)
+   */
+  override protected def resolveClass(objectStreamClass: ObjectStreamClass): 
Class[_] = {
+    val clazz = Class.forName(objectStreamClass.getName(), false, classLoader)
+    if (clazz.isPrimitive() || classOf[Number].isAssignableFrom(clazz)) {
+      clazz
+    } else {
+      throw new ClassNotFoundException(
+        s"Class rejected: ${objectStreamClass.getName()} (only primitive types 
and subclasses of java.lang.Number are allowed)")
+    }
+  }
+
+}
diff --git 
a/cluster-metrics/src/test/scala/org/apache/pekko/cluster/metrics/protobuf/NumberInputStreamSpec.scala
 
b/cluster-metrics/src/test/scala/org/apache/pekko/cluster/metrics/protobuf/NumberInputStreamSpec.scala
new file mode 100644
index 0000000000..5713433416
--- /dev/null
+++ 
b/cluster-metrics/src/test/scala/org/apache/pekko/cluster/metrics/protobuf/NumberInputStreamSpec.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.cluster.metrics.protobuf
+
+import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, 
ObjectOutputStream }
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+object NumberInputStreamSpec {
+  case class TestObject(value: String) extends Serializable
+}
+
+class NumberInputStreamSpec extends AnyWordSpec with Matchers {
+
+  import NumberInputStreamSpec._
+  val classLoader = classOf[NumberInputStreamSpec].getClassLoader
+
+  "NumberInputStream" must {
+
+    "resolve java Integer" in {
+
+      val i = 42
+      val bos = new ByteArrayOutputStream()
+      val oos = new ObjectOutputStream(bos)
+      oos.writeObject(Integer.valueOf(i))
+      oos.close()
+
+      val inputStream = new ByteArrayInputStream(bos.toByteArray)
+      val numberInputStream = new NumberInputStream(classLoader, inputStream)
+
+      val result = numberInputStream.readObject()
+      numberInputStream.close()
+      result shouldBe an[Integer]
+      result.asInstanceOf[Integer].intValue() shouldEqual i
+    }
+
+    "resolve scala Int" in {
+
+      val i = 42
+      val bos = new ByteArrayOutputStream()
+      val oos = new ObjectOutputStream(bos)
+      oos.writeObject(i)
+      oos.close()
+
+      val inputStream = new ByteArrayInputStream(bos.toByteArray)
+      val numberInputStream = new NumberInputStream(classLoader, inputStream)
+
+      val result = numberInputStream.readObject()
+      numberInputStream.close()
+      result shouldBe an[Int]
+      result.asInstanceOf[Int] shouldEqual i
+    }
+
+    "throw ClassNotFoundException for non-Number classes" in {
+      val bos = new ByteArrayOutputStream()
+      val oos = new ObjectOutputStream(bos)
+      oos.writeObject(TestObject("NotANumber"))
+      oos.close()
+
+      val inputStream = new ByteArrayInputStream(bos.toByteArray)
+      val numberInputStream = new NumberInputStream(classLoader, inputStream)
+
+      a[ClassNotFoundException] should be thrownBy {
+        numberInputStream.readObject()
+      }
+      numberInputStream.close()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to