This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch fix-osm-parser in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 0fecf8681196a4ae325327809c09596f8a4095ef Author: pawelkocinski <[email protected]> AuthorDate: Wed Oct 8 00:06:21 2025 +0200 SEDONA-745 Fix osm parser. --- .../osmpbf/extractors/DenseNodeExtractor.java | 2 +- .../Extractor.java} | 33 +------ .../datasources/osmpbf/iterators/BlobIterator.java | 108 ++++++--------------- .../osmpbf/{ => iterators}/DenseNodeIterator.java | 17 ++-- .../NodeIterator.java} | 49 +++++++--- .../datasources/osmpbf/iterators/PbfIterator.java | 2 +- .../RelationIterator.java} | 41 ++++++-- .../WayIterator.java} | 41 ++++++-- .../sql/datasources/osm/OsmPartitionReader.scala | 4 +- .../common/src/test/resources/osmpbf/planetosm.pbf | Bin 0 -> 2185276 bytes .../org/apache/sedona/sql/OsmReaderTest.scala | 27 ++++++ 11 files changed, 177 insertions(+), 147 deletions(-) diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/DenseNodeExtractor.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/DenseNodeExtractor.java index c65d2f870b..e6ac347762 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/DenseNodeExtractor.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/DenseNodeExtractor.java @@ -22,7 +22,7 @@ import java.util.HashMap; import org.apache.sedona.sql.datasources.osmpbf.build.Osmformat; import org.apache.sedona.sql.datasources.osmpbf.model.OsmNode; -public class DenseNodeExtractor { +public class DenseNodeExtractor implements Extractor { long latOffset; long lonOffset; long granularity; diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/DenseNodeIterator.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/Extractor.java similarity index 51% copy from spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/DenseNodeIterator.java copy to spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/Extractor.java index 3c4013c7eb..c8c1174024 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/DenseNodeIterator.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/Extractor.java @@ -16,36 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sedona.sql.datasources.osmpbf; +package org.apache.sedona.sql.datasources.osmpbf.extractors; -import java.util.Iterator; import org.apache.sedona.sql.datasources.osmpbf.build.Osmformat; -import org.apache.sedona.sql.datasources.osmpbf.extractors.DenseNodeExtractor; -import org.apache.sedona.sql.datasources.osmpbf.model.OsmNode; +import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity; -public class DenseNodeIterator implements Iterator<OsmNode> { - Osmformat.StringTable stringTable; - int idx; - long nodesSize; - DenseNodeExtractor extractor; - - public DenseNodeIterator( - long nodesSize, Osmformat.StringTable stringTable, DenseNodeExtractor extractor) { - this.stringTable = stringTable; - this.nodesSize = nodesSize; - this.idx = 0; - this.extractor = extractor; - } - - @Override - public boolean hasNext() { - return idx < nodesSize; - } - - @Override - public OsmNode next() { - OsmNode node = extractor.extract(idx, stringTable); - idx += 1; - return node; - } +public interface Extractor { + OSMEntity extract(int idx, Osmformat.StringTable stringTable); } diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/BlobIterator.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/BlobIterator.java index ff7a2f88d2..26ff532ba3 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/BlobIterator.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/BlobIterator.java @@ -23,128 +23,76 @@ import static org.apache.sedona.sql.datasources.osmpbf.ParseUtils.dataInputStrea import java.io.IOException; import java.util.Iterator; import java.util.zip.DataFormatException; -import org.apache.sedona.sql.datasources.osmpbf.DenseNodeIterator; import org.apache.sedona.sql.datasources.osmpbf.build.Fileformat.Blob; import org.apache.sedona.sql.datasources.osmpbf.build.Osmformat; import org.apache.sedona.sql.datasources.osmpbf.extractors.DenseNodeExtractor; -import org.apache.sedona.sql.datasources.osmpbf.extractors.NodeExtractor; -import org.apache.sedona.sql.datasources.osmpbf.extractors.RelationExtractor; -import org.apache.sedona.sql.datasources.osmpbf.extractors.WaysExtractor; import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity; -import org.apache.sedona.sql.datasources.osmpbf.model.OsmNode; public class BlobIterator implements Iterator<OSMEntity> { Blob blob; Osmformat.PrimitiveBlock primitiveBlock; int primitiveGroupIdx; - int osmEntityIdx; - DenseNodeIterator denseNodesIterator; + Iterator<OSMEntity> iterator; Osmformat.PrimitiveGroup currentPrimitiveGroup; public BlobIterator(Blob blob) throws DataFormatException, IOException { primitiveBlock = Osmformat.PrimitiveBlock.parseFrom(dataInputStreamBlob(blob)); primitiveGroupIdx = 0; - osmEntityIdx = 0; currentPrimitiveGroup = primitiveBlock.getPrimitivegroup(primitiveGroupIdx); + iterator = resolveIterator(); this.blob = blob; } @Override public boolean hasNext() { - return primitiveBlock.getPrimitivegroupList().size() != primitiveGroupIdx; + return primitiveGroupIdx < primitiveBlock.getPrimitivegroupList().size() - 1 + || iterator.hasNext(); } @Override public OSMEntity next() { - if (currentPrimitiveGroup == null) { - return null; + if (iterator.hasNext()) { + return iterator.next(); } - if (!currentPrimitiveGroup.getRelationsList().isEmpty()) { - return extractRelationPrimitiveGroup(); - } - - if (!currentPrimitiveGroup.getNodesList().isEmpty()) { - return extractNodePrimitiveGroup(); - } - - if (!currentPrimitiveGroup.getWaysList().isEmpty()) { - return extractWayPrimitiveGroup(); - } - - if (!currentPrimitiveGroup.getChangesetsList().isEmpty()) { - return null; - } - - if (currentPrimitiveGroup.getDense() != null) { - return extractDenseNodePrimitiveGroup(); - } - - return null; - } + primitiveGroupIdx += 1; - private OSMEntity extractNodePrimitiveGroup() { - osmEntityIdx += 1; - if (currentPrimitiveGroup.getNodesList().size() == osmEntityIdx) { - nextEntity(); - } + currentPrimitiveGroup = primitiveBlock.getPrimitivegroup(primitiveGroupIdx); - Osmformat.StringTable stringTable = primitiveBlock.getStringtable(); + iterator = resolveIterator(); - return new NodeExtractor(currentPrimitiveGroup, primitiveBlock) - .extract(osmEntityIdx, stringTable); + return iterator.next(); } - public OSMEntity extractDenseNodePrimitiveGroup() { - if (denseNodesIterator == null) { - denseNodesIterator = - new DenseNodeIterator( - currentPrimitiveGroup.getDense().getIdCount(), - primitiveBlock.getStringtable(), - new DenseNodeExtractor( - currentPrimitiveGroup.getDense(), - primitiveBlock.getLatOffset(), - primitiveBlock.getLonOffset(), - primitiveBlock.getGranularity())); + Iterator<OSMEntity> resolveIterator() { + if (!currentPrimitiveGroup.getWaysList().isEmpty()) { + return new WayIterator(currentPrimitiveGroup.getWaysList(), primitiveBlock.getStringtable()); } - OsmNode node = denseNodesIterator.next(); - - if (!denseNodesIterator.hasNext()) { - denseNodesIterator = null; - nextEntity(); + if (!currentPrimitiveGroup.getRelationsList().isEmpty()) { + return new RelationIterator( + currentPrimitiveGroup.getRelationsList(), primitiveBlock.getStringtable()); } - return node; - } - - public OSMEntity extractWayPrimitiveGroup() { - osmEntityIdx += 1; - if (currentPrimitiveGroup.getWaysList().size() == osmEntityIdx) { - nextEntity(); + if (!currentPrimitiveGroup.getNodesList().isEmpty()) { + return new NodeIterator(currentPrimitiveGroup.getNodesList(), primitiveBlock); } - return new WaysExtractor(currentPrimitiveGroup, primitiveBlock.getStringtable()) - .extract(osmEntityIdx); - } - - public OSMEntity extractRelationPrimitiveGroup() { - osmEntityIdx += 1; - if (currentPrimitiveGroup.getRelationsList().size() == osmEntityIdx) { - nextEntity(); + if (currentPrimitiveGroup.getDense() != null) { + return new DenseNodeIterator( + currentPrimitiveGroup.getDense().getIdCount(), + primitiveBlock.getStringtable(), + new DenseNodeExtractor( + currentPrimitiveGroup.getDense(), + primitiveBlock.getLatOffset(), + primitiveBlock.getLonOffset(), + primitiveBlock.getGranularity())); } - Osmformat.StringTable stringTable = primitiveBlock.getStringtable(); - - return new RelationExtractor(currentPrimitiveGroup, stringTable).extract(osmEntityIdx); - } - - public void nextEntity() { - primitiveGroupIdx += 1; - osmEntityIdx = 0; + return null; } } diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/DenseNodeIterator.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/DenseNodeIterator.java similarity index 71% rename from spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/DenseNodeIterator.java rename to spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/DenseNodeIterator.java index 3c4013c7eb..9d537bdd38 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/DenseNodeIterator.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/DenseNodeIterator.java @@ -16,21 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sedona.sql.datasources.osmpbf; +package org.apache.sedona.sql.datasources.osmpbf.iterators; import java.util.Iterator; import org.apache.sedona.sql.datasources.osmpbf.build.Osmformat; -import org.apache.sedona.sql.datasources.osmpbf.extractors.DenseNodeExtractor; -import org.apache.sedona.sql.datasources.osmpbf.model.OsmNode; +import org.apache.sedona.sql.datasources.osmpbf.extractors.Extractor; +import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity; -public class DenseNodeIterator implements Iterator<OsmNode> { +public class DenseNodeIterator implements Iterator<OSMEntity> { Osmformat.StringTable stringTable; int idx; long nodesSize; - DenseNodeExtractor extractor; + Extractor extractor; - public DenseNodeIterator( - long nodesSize, Osmformat.StringTable stringTable, DenseNodeExtractor extractor) { + public DenseNodeIterator(long nodesSize, Osmformat.StringTable stringTable, Extractor extractor) { this.stringTable = stringTable; this.nodesSize = nodesSize; this.idx = 0; @@ -43,8 +42,8 @@ public class DenseNodeIterator implements Iterator<OsmNode> { } @Override - public OsmNode next() { - OsmNode node = extractor.extract(idx, stringTable); + public OSMEntity next() { + OSMEntity node = extractor.extract(idx, stringTable); idx += 1; return node; } diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/NodeExtractor.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/NodeIterator.java similarity index 65% rename from spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/NodeExtractor.java rename to spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/NodeIterator.java index 9ec3147b1e..be966f912e 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/NodeExtractor.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/NodeIterator.java @@ -16,30 +16,57 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sedona.sql.datasources.osmpbf.extractors; +package org.apache.sedona.sql.datasources.osmpbf.iterators; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import org.apache.sedona.sql.datasources.osmpbf.build.Osmformat; import org.apache.sedona.sql.datasources.osmpbf.features.TagsResolver; +import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity; import org.apache.sedona.sql.datasources.osmpbf.model.OsmNode; -public class NodeExtractor { - - Osmformat.PrimitiveGroup primitiveGroup; +public class NodeIterator implements Iterator<OSMEntity> { + int idx; + long nodesCount; + List<Osmformat.Node> nodes; + Osmformat.StringTable stringTable; Osmformat.PrimitiveBlock primitiveBlock; - public NodeExtractor( - Osmformat.PrimitiveGroup primitiveGroup, Osmformat.PrimitiveBlock primitiveBlock) { - this.primitiveGroup = primitiveGroup; + public NodeIterator(List<Osmformat.Node> nodes, Osmformat.PrimitiveBlock primitiveBlock) { + this.idx = 0; + this.nodesCount = 0; + this.nodes = nodes; + this.stringTable = primitiveBlock.getStringtable(); this.primitiveBlock = primitiveBlock; + + if (nodes != null) { + this.nodesCount = nodes.size(); + } + } + + @Override + public boolean hasNext() { + return idx < nodesCount; + } + + @Override + public OSMEntity next() { + if (idx < nodesCount) { + OsmNode node = extract(idx); + idx++; + return node; + } + + return null; } - public OsmNode extract(int idx, Osmformat.StringTable stringTable) { - return parse(idx, stringTable); + public OsmNode extract(int idx) { + return parse(idx); } - private OsmNode parse(int idx, Osmformat.StringTable stringTable) { - Osmformat.Node node = primitiveGroup.getNodes(idx); + private OsmNode parse(int idx) { + Osmformat.Node node = nodes.get(idx); long id = node.getId(); long latitude = node.getLat(); diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/PbfIterator.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/PbfIterator.java index 381586d421..dec47b6432 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/PbfIterator.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/PbfIterator.java @@ -56,7 +56,7 @@ public class PbfIterator implements Iterator<OSMEntity> { } private BlobIterator readNextBlock() throws DataFormatException, IOException { - while (pmGroupIterator.hasNext()) { + if (pmGroupIterator.hasNext()) { BlobData next = pmGroupIterator.next(); if (next.getHeader().getType().equals("OSMData")) { return new BlobIterator(next.getBlob()); diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/RelationExtractor.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/RelationIterator.java similarity index 76% rename from spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/RelationExtractor.java rename to spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/RelationIterator.java index aa04189caa..b0b3164351 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/RelationExtractor.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/RelationIterator.java @@ -16,27 +16,52 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sedona.sql.datasources.osmpbf.extractors; +package org.apache.sedona.sql.datasources.osmpbf.iterators; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import org.apache.sedona.sql.datasources.osmpbf.build.Osmformat; import org.apache.sedona.sql.datasources.osmpbf.features.TagsResolver; +import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity; import org.apache.sedona.sql.datasources.osmpbf.model.Relation; import org.apache.sedona.sql.datasources.osmpbf.model.RelationType; -public class RelationExtractor { - - Osmformat.PrimitiveGroup primitiveGroup; +public class RelationIterator implements Iterator<OSMEntity> { + int idx; + long relationCount; + List<Osmformat.Relation> relations; Osmformat.StringTable stringTable; - public RelationExtractor( - Osmformat.PrimitiveGroup primitiveGroup, Osmformat.StringTable stringTable) { - this.primitiveGroup = primitiveGroup; + public RelationIterator(List<Osmformat.Relation> relations, Osmformat.StringTable stringTable) { + this.idx = 0; + this.relationCount = 0; + this.relations = relations; this.stringTable = stringTable; + + if (relations != null) { + this.relationCount = relations.size(); + } + } + + @Override + public boolean hasNext() { + return idx < relationCount; + } + + @Override + public OSMEntity next() { + if (idx < relationCount) { + Relation relation = extract(idx); + idx++; + return relation; + } + + return null; } public Relation extract(int idx) { - Osmformat.Relation relation = primitiveGroup.getRelations(idx); + Osmformat.Relation relation = relations.get(idx); return parse(relation); } diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/WaysExtractor.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/WayIterator.java similarity index 66% rename from spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/WaysExtractor.java rename to spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/WayIterator.java index 4c56ac5738..85693ec28a 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/WaysExtractor.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/WayIterator.java @@ -16,24 +16,51 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sedona.sql.datasources.osmpbf.extractors; +package org.apache.sedona.sql.datasources.osmpbf.iterators; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import org.apache.sedona.sql.datasources.osmpbf.build.Osmformat; import org.apache.sedona.sql.datasources.osmpbf.features.TagsResolver; +import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity; import org.apache.sedona.sql.datasources.osmpbf.model.Way; -public class WaysExtractor { - Osmformat.PrimitiveGroup primitiveGroup; +public class WayIterator implements Iterator<OSMEntity> { + int idx; + long waysCount; + List<Osmformat.Way> ways; Osmformat.StringTable stringTable; - public WaysExtractor(Osmformat.PrimitiveGroup primitiveGroup, Osmformat.StringTable stringTable) { - this.primitiveGroup = primitiveGroup; + public WayIterator(List<Osmformat.Way> ways, Osmformat.StringTable stringTable) { + this.idx = 0; + this.waysCount = 0; + this.ways = ways; this.stringTable = stringTable; + + if (ways != null) { + this.waysCount = ways.size(); + } + } + + @Override + public boolean hasNext() { + return idx < waysCount; + } + + @Override + public Way next() { + if (idx < waysCount) { + Way way = extract(idx); + idx++; + return way; + } + + return null; } - public Way extract(int idx) { - Osmformat.Way way = primitiveGroup.getWays(idx); + private Way extract(int idx) { + Osmformat.Way way = ways.get(idx); return parse(way); } diff --git a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala index 772a18d0d3..a41417816b 100644 --- a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala +++ b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala @@ -53,7 +53,9 @@ case class OsmPartitionReader( f.seek(file.start + offset) new PbfIterator(new StartEndStream(f, (file.length - offset) + HEADER_SIZE_LENGTH)).map( - record => resolveEntity(record, requiredSchema)) + record => { + resolveEntity(record, requiredSchema) + }) } def findOffset(fs: FileSystem, status: FileStatus, start: Long): Long = { diff --git a/spark/common/src/test/resources/osmpbf/planetosm.pbf b/spark/common/src/test/resources/osmpbf/planetosm.pbf new file mode 100644 index 0000000000..d54a588ae3 Binary files /dev/null and b/spark/common/src/test/resources/osmpbf/planetosm.pbf differ diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala b/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala index 0d791b2b24..8c61ae0785 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala @@ -31,6 +31,7 @@ class OsmReaderTest extends TestBaseScala with Matchers { val monacoPath: String = resourceFolder + "osmpbf/monaco-latest.osm.pbf" val densePath: String = resourceFolder + "osmpbf/dense.pbf" val nodesPath: String = resourceFolder + "osmpbf/nodes.pbf" + val planetOsmPath: String = resourceFolder + "osmpbf/planetosm.pbf" import sparkSession.implicits._ @@ -48,6 +49,32 @@ class OsmReaderTest extends TestBaseScala with Matchers { assert(cnt > 0) } + it("should be able to process planet osm files") { + val numberOfUniques = sparkSession.read + .format("osmpbf") + .load(planetOsmPath) + .dropDuplicates("id") + .count + + val numberOfElements = sparkSession.read + .format("osmpbf") + .load(planetOsmPath) + .count + + numberOfUniques shouldBe 64000 + numberOfElements shouldBe 64000 + + val idsToVerify = Seq(64949611, 64955092, 64949580, 64949694, 64949868, 64958096, 64958295) + + val elementsCount = sparkSession.read + .format("osmpbf") + .load(planetOsmPath) + .where($"id".isin(idsToVerify: _*)) + .count + + elementsCount shouldBe idsToVerify.length + } + it("should parse normal nodes") { sparkSession.read .format("osmpbf")
