This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch migrate/xml-parse-with-context-2935
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git

commit bc127ff1289a9c76a84b5f90663a32877cf4cf5c
Author: 虎鸣 <[email protected]>
AuthorDate: Wed Mar 11 00:46:27 2026 +0800

    feat(xml): Add context-aware XML parsing (parserWithContext API)
    
    Migrated from upstream akka/alpakka commit b813e7fb (alpakka#2935),
    which is now Apache licensed.
    
    This adds a new parserWithContext[Ctx]() API to the XML module that
    allows attaching context information to XML parse events. The context
    flows through the parsing pipeline via FlowWithContext, enabling
    use cases such as tracking line numbers, source positions, or any
    arbitrary metadata alongside parsed XML events.
    
    Changes:
    - StreamingXmlParser: Generalized to support context via ContextHandler
      type class (uncontextual for backward compat, contextual for new API)
    - scaladsl.XmlParsing: Added parserWithContext[Ctx]() method
    - javadsl.XmlParsing: Added parserWithContext[Ctx]() overloads (3 variants)
    - XmlProcessingSpec: Added test for context-aware parsing with line numbers
    
    All existing parser APIs remain backward compatible.
    
    Upstream: 
https://github.com/akka/alpakka/commit/b813e7fb772153edf14f7e5997abad72f5acef07
    
    Co-authored-by: Copilot <[email protected]>
---
 .gitmodules                                        |  3 +
 .upstream-alpakka                                  |  1 +
 .../connectors/xml/impl/StreamingXmlParser.scala   | 70 +++++++++++++++++-----
 .../stream/connectors/xml/javadsl/XmlParsing.scala | 31 ++++++++++
 .../connectors/xml/scaladsl/XmlParsing.scala       | 25 +++++++-
 .../scala/docs/scaladsl/XmlProcessingSpec.scala    | 38 +++++++++++-
 6 files changed, 149 insertions(+), 19 deletions(-)

diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 000000000..d8999e6c9
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,3 @@
+[submodule ".upstream-alpakka"]
+       path = .upstream-alpakka
+       url = https://github.com/akka/alpakka.git
diff --git a/.upstream-alpakka b/.upstream-alpakka
new file mode 160000
index 000000000..b0becb5d3
--- /dev/null
+++ b/.upstream-alpakka
@@ -0,0 +1 @@
+Subproject commit b0becb5d3ec28aa06476a32c961e764fd488ac3b
diff --git 
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala
 
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala
index 05f12cf1f..94e5e84ee 100644
--- 
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala
+++ 
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlParser.scala
@@ -28,21 +28,53 @@ import scala.annotation.tailrec
 
 private[xml] object StreamingXmlParser {
   lazy val withStreamingFinishedException = new IllegalStateException("Stream 
finished before event was fully parsed.")
+
+  /**
+   * Type class that handles extracting and re-attaching context information
+   * during XML parsing. This enables context-aware parsing where each parse
+   * event can carry additional context from the input side.
+   */
+  sealed trait ContextHandler[A, B, Ctx] {
+    def getByteString(a: A): ByteString
+    def getContext(a: A): Ctx
+    def buildOutput(pe: ParseEvent, ctx: Ctx): B
+  }
+
+  object ContextHandler {
+
+    /** Handler for standard (context-free) parsing: ByteString in, ParseEvent 
out. */
+    final val uncontextual: ContextHandler[ByteString, ParseEvent, Unit] =
+      new ContextHandler[ByteString, ParseEvent, Unit] {
+        def getByteString(a: ByteString): ByteString = a
+        def getContext(a: ByteString): Unit = ()
+        def buildOutput(pe: ParseEvent, ctx: Unit): ParseEvent = pe
+      }
+
+    /** Handler for context-aware parsing: (ByteString, Ctx) in, (ParseEvent, 
Ctx) out. */
+    final def contextual[Ctx]: ContextHandler[(ByteString, Ctx), (ParseEvent, 
Ctx), Ctx] =
+      new ContextHandler[(ByteString, Ctx), (ParseEvent, Ctx), Ctx] {
+        def getByteString(a: (ByteString, Ctx)): ByteString = a._1
+        def getContext(a: (ByteString, Ctx)): Ctx = a._2
+        def buildOutput(pe: ParseEvent, ctx: Ctx): (ParseEvent, Ctx) = (pe, 
ctx)
+      }
+  }
 }
 
 /**
  * INTERNAL API
  */
-@InternalApi private[xml] class StreamingXmlParser(ignoreInvalidChars: Boolean,
-    configureFactory: AsyncXMLInputFactory => Unit)
-    extends GraphStage[FlowShape[ByteString, ParseEvent]] {
-  val in: Inlet[ByteString] = Inlet("XMLParser.in")
-  val out: Outlet[ParseEvent] = Outlet("XMLParser.out")
-  override val shape: FlowShape[ByteString, ParseEvent] = FlowShape(in, out)
+@InternalApi private[xml] class StreamingXmlParser[A, B, 
Ctx](ignoreInvalidChars: Boolean,
+    configureFactory: AsyncXMLInputFactory => Unit,
+    transform: StreamingXmlParser.ContextHandler[A, B, Ctx])
+    extends GraphStage[FlowShape[A, B]] {
+  val in: Inlet[A] = Inlet("XMLParser.in")
+  val out: Outlet[B] = Outlet("XMLParser.out")
+  override val shape: FlowShape[A, B] = FlowShape(in, out)
 
   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
     new GraphStageLogic(shape) with InHandler with OutHandler {
       private var started: Boolean = false
+      private var context: Ctx = _
 
       import javax.xml.stream.XMLStreamConstants
 
@@ -56,7 +88,10 @@ private[xml] object StreamingXmlParser {
       setHandlers(in, out, this)
 
       override def onPush(): Unit = {
-        val array = grab(in).toArray
+        val a = grab(in)
+        val bs = transform.getByteString(a)
+        context = transform.getContext(a)
+        val array = bs.toArray
         parser.getInputFeeder.feedInput(array, 0, array.length)
         advanceParser()
       }
@@ -78,10 +113,10 @@ private[xml] object StreamingXmlParser {
 
             case XMLStreamConstants.START_DOCUMENT =>
               started = true
-              push(out, StartDocument)
+              push(out, transform.buildOutput(StartDocument, context))
 
             case XMLStreamConstants.END_DOCUMENT =>
-              push(out, EndDocument)
+              push(out, transform.buildOutput(EndDocument, context))
               completeStage()
 
             case XMLStreamConstants.START_ELEMENT =>
@@ -102,26 +137,29 @@ private[xml] object StreamingXmlParser {
               val optNs = optPrefix.flatMap(prefix => 
Option(parser.getNamespaceURI(prefix)))
               push(
                 out,
-                StartElement(parser.getLocalName,
+                transform.buildOutput(StartElement(parser.getLocalName,
                   attributes,
                   optPrefix.filterNot(_ == ""),
                   optNs.filterNot(_ == ""),
-                  namespaceCtx = namespaces))
+                  namespaceCtx = namespaces),
+                  context))
 
             case XMLStreamConstants.END_ELEMENT =>
-              push(out, EndElement(parser.getLocalName))
+              push(out, transform.buildOutput(EndElement(parser.getLocalName), 
context))
 
             case XMLStreamConstants.CHARACTERS =>
-              push(out, Characters(parser.getText))
+              push(out, transform.buildOutput(Characters(parser.getText), 
context))
 
             case XMLStreamConstants.PROCESSING_INSTRUCTION =>
-              push(out, ProcessingInstruction(Option(parser.getPITarget), 
Option(parser.getPIData)))
+              push(out,
+                
transform.buildOutput(ProcessingInstruction(Option(parser.getPITarget), 
Option(parser.getPIData)),
+                  context))
 
             case XMLStreamConstants.COMMENT =>
-              push(out, Comment(parser.getText))
+              push(out, transform.buildOutput(Comment(parser.getText), 
context))
 
             case XMLStreamConstants.CDATA =>
-              push(out, CData(parser.getText))
+              push(out, transform.buildOutput(CData(parser.getText), context))
 
             // Do not support DTD, SPACE, NAMESPACE, NOTATION_DECLARATION, 
ENTITY_DECLARATION, PROCESSING_INSTRUCTION
             // ATTRIBUTE is handled in START_ELEMENT implicitly
diff --git 
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala
 
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala
index 3a2fca1e5..d10edd8d5 100644
--- 
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala
+++ 
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/javadsl/XmlParsing.scala
@@ -33,12 +33,31 @@ object XmlParsing {
   def parser(): pekko.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] =
     xml.scaladsl.XmlParsing.parser.asJava
 
+  /**
+   * Parser Flow that takes a stream of ByteStrings and parses them to XML 
events similar to SAX while keeping
+   * a context attached.
+   *
+   * Upstream from akka/alpakka#2935 (which is now Apache licensed).
+   */
+  def parserWithContext[Ctx](): 
pekko.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] 
=
+    xml.scaladsl.XmlParsing.parserWithContext().asJava
+
   /**
    * Parser Flow that takes a stream of ByteStrings and parses them to XML 
events similar to SAX.
    */
   def parser(ignoreInvalidChars: Boolean): 
pekko.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] =
     xml.scaladsl.XmlParsing.parser(ignoreInvalidChars).asJava
 
+  /**
+   * Parser Flow that takes a stream of ByteStrings and parses them to XML 
events similar to SAX while keeping
+   * a context attached.
+   *
+   * Upstream from akka/alpakka#2935 (which is now Apache licensed).
+   */
+  def parserWithContext[Ctx](
+      ignoreInvalidChars: Boolean): 
pekko.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] 
=
+    xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars).asJava
+
   /**
    * Parser Flow that takes a stream of ByteStrings and parses them to XML 
events similar to SAX.
    */
@@ -54,6 +73,18 @@ object XmlParsing {
       configureFactory: Consumer[AsyncXMLInputFactory]): 
pekko.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] =
     xml.scaladsl.XmlParsing.parser(ignoreInvalidChars, 
configureFactory.accept(_)).asJava
 
+  /**
+   * Parser Flow that takes a stream of ByteStrings and parses them to XML 
events similar to SAX while keeping
+   * a context attached.
+   *
+   * Upstream from akka/alpakka#2935 (which is now Apache licensed).
+   */
+  def parserWithContext[Ctx](
+      ignoreInvalidChars: Boolean,
+      configureFactory: Consumer[AsyncXMLInputFactory])
+      : pekko.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, 
NotUsed] =
+    xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars, 
configureFactory.accept(_)).asJava
+
   /**
    * A Flow that transforms a stream of XML ParseEvents. This stage coalesces 
consequitive CData and Characters
    * events into a single Characters event or fails if the buffered string is 
larger than the maximum defined.
diff --git 
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala
 
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala
index 2cf101363..40d67401c 100644
--- 
a/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala
+++ 
b/xml/src/main/scala/org/apache/pekko/stream/connectors/xml/scaladsl/XmlParsing.scala
@@ -17,7 +17,7 @@ import org.apache.pekko
 import pekko.NotUsed
 import pekko.stream.connectors.xml.ParseEvent
 import pekko.stream.connectors.xml.impl
-import pekko.stream.scaladsl.Flow
+import pekko.stream.scaladsl.{ Flow, FlowWithContext }
 import pekko.util.ByteString
 import com.fasterxml.aalto.AsyncXMLInputFactory
 import org.w3c.dom.Element
@@ -50,7 +50,28 @@ object XmlParsing {
    */
   def parser(ignoreInvalidChars: Boolean = false,
       configureFactory: AsyncXMLInputFactory => Unit = configureDefault): 
Flow[ByteString, ParseEvent, NotUsed] =
-    Flow.fromGraph(new impl.StreamingXmlParser(ignoreInvalidChars, 
configureFactory))
+    Flow[ByteString].via(
+      Flow.fromGraph(
+        new impl.StreamingXmlParser[ByteString, ParseEvent, 
Unit](ignoreInvalidChars,
+          configureFactory,
+          impl.StreamingXmlParser.ContextHandler.uncontextual)))
+
+  /**
+   * Parser Flow that takes a stream of ByteStrings and parses them to XML 
events similar to SAX while keeping
+   * a context attached.
+   *
+   * Upstream from akka/alpakka#2935 (which is now Apache licensed).
+   */
+  def parserWithContext[Ctx](
+      ignoreInvalidChars: Boolean = false,
+      configureFactory: AsyncXMLInputFactory => Unit =
+        configureDefault): FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, 
NotUsed] =
+    FlowWithContext.fromTuples(
+      Flow.fromGraph(
+        new impl.StreamingXmlParser[(ByteString, Ctx), (ParseEvent, Ctx), Ctx](
+          ignoreInvalidChars,
+          configureFactory,
+          impl.StreamingXmlParser.ContextHandler.contextual)))
 
   /**
    * A Flow that transforms a stream of XML ParseEvents. This stage coalesces 
consecutive CData and Characters
diff --git a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala 
b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala
index 0187ba7cf..c6f0d3c13 100644
--- a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala
+++ b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala
@@ -18,7 +18,7 @@ import pekko.actor.ActorSystem
 import pekko.stream.connectors.testkit.scaladsl.LogCapturing
 import pekko.stream.connectors.xml._
 import pekko.stream.connectors.xml.scaladsl.XmlParsing
-import pekko.stream.scaladsl.{ Flow, Keep, Sink, Source }
+import pekko.stream.scaladsl.{ Flow, Framing, Keep, Sink, Source }
 import pekko.util.ByteString
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.BeforeAndAfterAll
@@ -340,6 +340,42 @@ class XmlProcessingSpec extends AnyWordSpec with Matchers 
with ScalaFutures with
       configWasCalled shouldBe true
     }
 
+    "parse XML and attach line numbers as context" in {
+      val doc = """|<doc>
+                   |  <elem>
+                   |    elem1
+                   |  </elem>
+                   |  <elem>
+                   |    elem2
+                   |  </elem>
+                   |</doc>""".stripMargin
+      val resultFuture = Source
+        .single(ByteString(doc))
+        .via(
+          Framing.delimiter(delimiter = ByteString(System.lineSeparator),
+            maximumFrameLength = 65536,
+            allowTruncation = true))
+        .zipWithIndex
+        
.runWith(XmlParsing.parserWithContext[Long]().asFlow.toMat(Sink.seq)(Keep.right))
+
+      resultFuture.futureValue should ===(
+        List(
+          (StartDocument, 0L),
+          (StartElement("doc"), 0L),
+          (Characters("  "), 1L),
+          (StartElement("elem"), 1L),
+          (Characters("    elem1"), 2L),
+          (Characters("  "), 3L),
+          (EndElement("elem"), 3L),
+          (Characters("  "), 4L),
+          (StartElement("elem"), 4L),
+          (Characters("    elem2"), 5L),
+          (Characters("  "), 6L),
+          (EndElement("elem"), 6L),
+          (EndElement("doc"), 7L),
+          (EndDocument, 7L)))
+    }
+
   }
 
   override protected def afterAll(): Unit = system.terminate()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to