This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5656-e0a96478817a5c7a2f585de1952e40f7c8ba534f in repository https://gitbox.apache.org/repos/asf/texera.git
commit 7ae9b35f12748616daf7bcc925fdde2e5def5187 Author: Xinyuan Lin <[email protected]> AuthorDate: Fri Jun 12 17:30:08 2026 -0700 test(workflow-operator): add unit test coverage for filter-family operator executors (#5656) ### What changes were proposed in this PR? Pin behavior of four previously-uncovered modules in the `FilterOpExec` inheritance hierarchy in `common/workflow-operator`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `FilterOpExecSpec` | `FilterOpExec` (abstract base) | 9 | | `RegexOpExecSpec` | `RegexOpExec` | 8 | | `SubstringSearchOpExecSpec` | `SubstringSearchOpExec` | 10 | | `RandomKSamplingOpExecSpec` | `RandomKSamplingOpExec` | 7 | All four spec files follow the `<srcClassName>Spec.scala` one-to-one convention. `SpecializedFilterOpExec` already has its own spec; this PR covers the rest of the family. **Behavior pinned — `FilterOpExec`** | Surface | Contract | | --- | --- | | `processTuple` (matching predicate) | yields the input tuple as a single-element iterator | | `processTuple` (non-matching predicate) | yields an empty iterator | | `processTuple` | passes the actual tuple instance to the predicate; ignores the `port` argument | | `setFilterFunc` | swapping the predicate changes the next `processTuple` result; value-aware predicates branch per-tuple | | Type contract | `FilterOpExec` is a `Serializable OperatorExecutor` | **Behavior pinned — `RegexOpExec`** | Surface | Contract | | --- | --- | | matching regex | yields the tuple | | find-semantics | unanchored substring match (not full-string `matches`) | | `caseInsensitive = true` / `false` | matches case-(in)sensitively | | invalid regex string | construction succeeds (lazy `Pattern`); `PatternSyntaxException` surfaces on first `processTuple` | | repeated invocations | pattern stays cached; results are stable | | malformed descriptor JSON | construction throws `JsonProcessingException` | **Behavior pinned — `SubstringSearchOpExec`** | Surface | Contract | | --- | --- | | substring present / absent | yields tuple / nothing | | position in value (start / middle / end) | irrelevant — `String.contains` semantics | | `isCaseSensitive = true` / `false` | case-(in)sensitive (lowercased equality on both sides) | | empty substring | matches every value, including the empty string | | repeated invocations | results stable | | malformed descriptor JSON | construction throws `JsonProcessingException` | **Behavior pinned — `RandomKSamplingOpExec`** | Surface | Contract | | --- | --- | | `percentage = 100` | accepts every tuple (1000-sample run) | | `percentage = 0` | rejects every tuple (1000-sample run) | | Same `workerCount` + `percentage` | identical emission count across two fresh instances (deterministic seed) | | `percentage = 50` | approximately half pass (within ±150 of 1000 over 2000 draws) | | Different `workerCount` | divergent emission sequences (the seed is `workerCount`) | | malformed descriptor JSON | construction throws `JsonProcessingException` | `FilterOpExec` is abstract, so the spec uses a minimal test-only concrete subclass that exposes `setFilterFunc` for behavior-only assertions. The three subclass specs build descriptor JSON via `objectMapper.writeValueAsString` of a fresh `*OpDesc` (same fixture pattern as the existing `SpecializedFilterOpExecSpec`). ### Any related issues, documentation, discussions? Closes #5652. ### How was this PR tested? Pure unit-test additions; verified locally with: - `sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.filter.FilterOpExecSpec org.apache.texera.amber.operator.regex.RegexOpExecSpec org.apache.texera.amber.operator.substringSearch.SubstringSearchOpExecSpec org.apache.texera.amber.operator.randomksampling.RandomKSamplingOpExecSpec"` — 34 tests, all green - `sbt scalafmtCheckAll` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.7 [1M context]) --- .../amber/operator/filter/FilterOpExecSpec.scala | 130 +++++++++++++++++++ .../RandomKSamplingOpExecSpec.scala | 129 +++++++++++++++++++ .../amber/operator/regex/RegexOpExecSpec.scala | 139 +++++++++++++++++++++ .../SubstringSearchOpExecSpec.scala | 133 ++++++++++++++++++++ 4 files changed, 531 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpExecSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpExecSpec.scala new file mode 100644 index 0000000000..49935866ca --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/filter/FilterOpExecSpec.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.filter + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.scalatest.flatspec.AnyFlatSpec + +class FilterOpExecSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Test harness — FilterOpExec is abstract. The existing + // SpecializedFilterOpExecSpec covers the full SpecializedFilterOpExec + // (predicate parsing + many AttributeType cases); here we exercise the + // base trait's contract directly via a minimal concrete subclass that + // exposes setFilterFunc. + // --------------------------------------------------------------------------- + + private class BareFilterOpExec extends FilterOpExec + + // A one-attribute tuple is enough to drive predicate evaluation. + private val attr = new Attribute("v", AttributeType.INTEGER) + private val schema: Schema = Schema().add(attr) + private def tuple(v: Int): Tuple = + Tuple.builder(schema).add(attr, Integer.valueOf(v)).build() + + // --------------------------------------------------------------------------- + // processTuple — pass-through when predicate matches + // --------------------------------------------------------------------------- + + "FilterOpExec.processTuple" should + "yield the input tuple when filterFunc returns true" in { + val exec = new BareFilterOpExec + exec.setFilterFunc(_ => true) + val t = tuple(42) + val out = exec.processTuple(t, port = 0).toList + assert(out == List(t)) + } + + it should "yield an empty Iterator when filterFunc returns false" in { + val exec = new BareFilterOpExec + exec.setFilterFunc(_ => false) + val out = exec.processTuple(tuple(42), port = 0).toList + assert(out.isEmpty) + } + + it should "evaluate filterFunc against the actual tuple (not a copy or null)" in { + // Capture what the predicate sees — must equal the argument passed in. + val exec = new BareFilterOpExec + var seen: Tuple = null + exec.setFilterFunc { t => + seen = t + true + } + val t = tuple(7) + val _ = exec.processTuple(t, port = 0).toList + assert(seen eq t, "filterFunc must receive the same Tuple instance") + } + + it should "ignore the port argument (port-agnostic by contract)" in { + val exec = new BareFilterOpExec + exec.setFilterFunc(_ => true) + val t = tuple(1) + val portsTested = List(0, 1, 7, Int.MaxValue, -1) + portsTested.foreach { p => + assert(exec.processTuple(t, port = p).toList == List(t), s"port=$p") + } + } + + it should "produce an iterator with exactly one element when the predicate matches" in { + // The contract is "single tuple" — not a multi-element iterator. + val exec = new BareFilterOpExec + exec.setFilterFunc(_ => true) + val iter = exec.processTuple(tuple(1), port = 0) + assert(iter.hasNext) + iter.next() + assert(!iter.hasNext, "iterator must be exhausted after the single match") + } + + // --------------------------------------------------------------------------- + // setFilterFunc — swapping the predicate + // --------------------------------------------------------------------------- + + "FilterOpExec.setFilterFunc" should + "swap the predicate for subsequent processTuple calls" in { + val exec = new BareFilterOpExec + exec.setFilterFunc(_ => true) + assert(exec.processTuple(tuple(1), port = 0).toList.size == 1) + exec.setFilterFunc(_ => false) + assert(exec.processTuple(tuple(1), port = 0).toList.isEmpty) + } + + it should "accept a value-aware predicate that branches on the tuple's content" in { + // Pin that the predicate is genuinely consulted per-tuple (not memoized). + val exec = new BareFilterOpExec + exec.setFilterFunc(t => t.getField[Integer](0).intValue() % 2 == 0) + assert(exec.processTuple(tuple(2), port = 0).toList == List(tuple(2))) + assert(exec.processTuple(tuple(3), port = 0).toList.isEmpty) + assert(exec.processTuple(tuple(4), port = 0).toList == List(tuple(4))) + } + + // --------------------------------------------------------------------------- + // Serializable conformance — FilterOpExec extends Serializable so + // executors can ship over the wire. + // --------------------------------------------------------------------------- + + "FilterOpExec" should "be a Serializable OperatorExecutor (compile-time enforced)" in { + val exec: java.io.Serializable = new BareFilterOpExec + assert(exec != null) + val asOpExec: org.apache.texera.amber.core.executor.OperatorExecutor = + new BareFilterOpExec + assert(asOpExec != null) + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpExecSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpExecSpec.scala new file mode 100644 index 0000000000..ea4f78fe0f --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/randomksampling/RandomKSamplingOpExecSpec.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.randomksampling + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec + +class RandomKSamplingOpExecSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Fixture builders + // --------------------------------------------------------------------------- + + private val attr = new Attribute("v", AttributeType.INTEGER) + private val schema: Schema = Schema().add(attr) + private def tuple(v: Int): Tuple = + Tuple.builder(schema).add(attr, Integer.valueOf(v)).build() + + private def descJson(percentage: Int): String = { + val desc = new RandomKSamplingOpDesc + desc.percentage = percentage + objectMapper.writeValueAsString(desc) + } + + /** Run `count` tuples through `exec` and return how many it emitted. */ + private def emittedCount(exec: RandomKSamplingOpExec, count: Int): Int = + (1 to count).count(i => exec.processTuple(tuple(i), port = 0).nonEmpty) + + // --------------------------------------------------------------------------- + // Boundary cases — 0% and 100% + // --------------------------------------------------------------------------- + // + // The predicate is `(desc.percentage / 100.0) >= rand.nextDouble()`. + // `Random.nextDouble()` returns a value in `[0.0, 1.0)`. + // + // - At 100% (`1.0`), `1.0 >= rand.nextDouble()` always holds → accept all. + // - At 0% (`0.0`), `0.0 >= rand.nextDouble()` holds iff `nextDouble()` + // returns `0.0`. The probability of that is 1 / 2^53 ≈ 10^-16 — for + // practical purposes, reject all. + + "RandomKSamplingOpExec with percentage = 100" should "accept every tuple" in { + val exec = new RandomKSamplingOpExec(descJson(percentage = 100), idx = 0, workerCount = 7) + assert(emittedCount(exec, 1000) == 1000) + } + + "RandomKSamplingOpExec with percentage = 0" should "reject every tuple" in { + // Edge case (`rand.nextDouble() == 0.0` would let one through) is + // astronomically improbable — running 1000 draws with a fixed seed + // either always passes or always fails. The latter is what the + // implementation produces for percentage 0. + val exec = new RandomKSamplingOpExec(descJson(percentage = 0), idx = 0, workerCount = 7) + assert(emittedCount(exec, 1000) == 0) + } + + // --------------------------------------------------------------------------- + // Determinism — seed = workerCount, so the same (workerCount, + // percentage, input-count) produces the same emission count across runs. + // --------------------------------------------------------------------------- + + "RandomKSamplingOpExec with the same workerCount and percentage" should + "produce the same emission count across two fresh instances (deterministic seed)" in { + val a = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 0, workerCount = 13) + val b = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 1, workerCount = 13) + val countA = emittedCount(a, 200) + val countB = emittedCount(b, 200) + assert(countA == countB, s"deterministic seed should give equal counts, got $countA vs $countB") + } + + it should "yield approximately the requested fraction over a large sample" in { + // At 50% over 2000 tuples, the expected emission count is ~1000. + // For a binomial(2000, 0.5), 3σ is ~67 — allow a ±150 band so the + // case is well clear of stochastic flakiness while still catching + // gross deviations (e.g. percentage being ignored). + val exec = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 0, workerCount = 1) + val n = emittedCount(exec, 2000) + assert(n >= 850 && n <= 1150, s"expected ~1000 emissions at 50%%, got $n") + } + + // --------------------------------------------------------------------------- + // Different worker seeds — different streams + // --------------------------------------------------------------------------- + + "RandomKSamplingOpExec with different workerCount values" should + "draw different sequences (the seed is workerCount)" in { + // Two executors with the same percentage but different workerCount + // should not produce IDENTICAL emission sequences over a meaningful + // sample — the seed is workerCount, so the streams diverge. + val a = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 0, workerCount = 1) + val b = new RandomKSamplingOpExec(descJson(percentage = 50), idx = 0, workerCount = 2) + val emissionsA = (1 to 100).map(i => execEmit(a, i)) + val emissionsB = (1 to 100).map(i => execEmit(b, i)) + assert( + emissionsA != emissionsB, + "different workerCount seeds must produce different emission sequences" + ) + } + + private def execEmit(exec: RandomKSamplingOpExec, i: Int): Boolean = + exec.processTuple(tuple(i), port = 0).nonEmpty + + // --------------------------------------------------------------------------- + // Descriptor parse failure + // --------------------------------------------------------------------------- + + "RandomKSamplingOpExec construction" should + "throw on malformed descriptor JSON" in { + intercept[com.fasterxml.jackson.core.JsonProcessingException] { + new RandomKSamplingOpExec("{not valid", idx = 0, workerCount = 1) + } + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/regex/RegexOpExecSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/regex/RegexOpExecSpec.scala new file mode 100644 index 0000000000..3d93539876 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/regex/RegexOpExecSpec.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.regex + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec + +class RegexOpExecSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Fixture builders — mirrors the pattern used by SpecializedFilterOpExecSpec + // --------------------------------------------------------------------------- + + private val attr = new Attribute("body", AttributeType.STRING) + private val schema: Schema = Schema().add(attr) + private def tuple(text: String): Tuple = + Tuple.builder(schema).add(attr, text).build() + + private def descJson(regex: String, caseInsensitive: Boolean = false): String = { + val desc = new RegexOpDesc + desc.attribute = "body" + desc.regex = regex + desc.caseInsensitive = caseInsensitive + objectMapper.writeValueAsString(desc) + } + + // --------------------------------------------------------------------------- + // Pattern matching — find-semantics (substring match, not full match) + // --------------------------------------------------------------------------- + + "RegexOpExec" should "yield the input tuple when the regex matches the attribute" in { + val exec = new RegexOpExec(descJson(regex = "hello")) + val t = tuple("hello world") + assert(exec.processTuple(t, port = 0).toList == List(t)) + } + + it should "use find-semantics — a pattern matches if it appears anywhere in the value" in { + // `Matcher.find` succeeds on a substring; it is not anchored. Pin + // this so a future refactor that switched to `matches` (full-string) + // would surface here. + val exec = new RegexOpExec(descJson(regex = "abc")) + val t = tuple("xx abc xx") + assert(exec.processTuple(t, port = 0).toList == List(t)) + } + + it should "yield nothing when the regex does not match" in { + val exec = new RegexOpExec(descJson(regex = "foo")) + assert(exec.processTuple(tuple("bar baz"), port = 0).toList.isEmpty) + } + + it should "yield the tuple when the regex character class matches at least one char" in { + val exec = new RegexOpExec(descJson(regex = "\\d+")) + assert(exec.processTuple(tuple("answer is 42 plus"), port = 0).toList.size == 1) + } + + // --------------------------------------------------------------------------- + // Case sensitivity + // --------------------------------------------------------------------------- + + "RegexOpExec with caseInsensitive = true" should "match case-insensitively" in { + val exec = new RegexOpExec(descJson(regex = "HELLO", caseInsensitive = true)) + val t = tuple("hello world") + assert(exec.processTuple(t, port = 0).toList == List(t)) + } + + "RegexOpExec with caseInsensitive = false" should + "NOT match when the case differs (default behavior)" in { + val exec = new RegexOpExec(descJson(regex = "HELLO", caseInsensitive = false)) + assert(exec.processTuple(tuple("hello world"), port = 0).toList.isEmpty) + } + + it should "still match identical case under case-sensitive mode" in { + val exec = new RegexOpExec(descJson(regex = "HELLO", caseInsensitive = false)) + val t = tuple("Say HELLO!") + assert(exec.processTuple(t, port = 0).toList == List(t)) + } + + // --------------------------------------------------------------------------- + // Pattern compilation laziness — `pattern` is a lazy val; pin that + // construction does not eagerly compile (so a bad regex doesn't blow + // up at the wrong time). + // --------------------------------------------------------------------------- + + "RegexOpExec" should + "tolerate construction with an invalid regex (compilation is lazy on `pattern`)" in { + // `[` is an invalid character class — but the pattern is lazily + // compiled inside `matchRegex`. The constructor must succeed; the + // failure only surfaces on the first processTuple call. + val exec = new RegexOpExec(descJson(regex = "[")) + intercept[java.util.regex.PatternSyntaxException] { + exec.processTuple(tuple("anything"), port = 0).toList + } + } + + // --------------------------------------------------------------------------- + // Repeated invocations — pattern stays cached (lazy val), behavior stable + // --------------------------------------------------------------------------- + + it should "produce stable results across repeated processTuple calls (pattern cached)" in { + val exec = new RegexOpExec(descJson(regex = "match")) + val hit = tuple("match here") + val miss = tuple("no signal") + assert(exec.processTuple(hit, port = 0).toList == List(hit)) + assert(exec.processTuple(miss, port = 0).toList.isEmpty) + assert(exec.processTuple(hit, port = 0).toList == List(hit)) + } + + // --------------------------------------------------------------------------- + // Descriptor parse failure surfaces during construction + // --------------------------------------------------------------------------- + + "RegexOpExec construction" should + "throw on malformed descriptor JSON" in { + // The constructor calls objectMapper.readValue; mis-formed JSON must + // propagate as a Jackson parse exception, not silently fall through + // to a half-constructed executor. + intercept[com.fasterxml.jackson.core.JsonProcessingException] { + new RegexOpExec("{not valid json") + } + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/substringSearch/SubstringSearchOpExecSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/substringSearch/SubstringSearchOpExecSpec.scala new file mode 100644 index 0000000000..83fd90fee0 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/substringSearch/SubstringSearchOpExecSpec.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.substringSearch + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec + +class SubstringSearchOpExecSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Fixture builders + // --------------------------------------------------------------------------- + + private val attr = new Attribute("body", AttributeType.STRING) + private val schema: Schema = Schema().add(attr) + private def tuple(text: String): Tuple = + Tuple.builder(schema).add(attr, text).build() + + private def descJson(substring: String, isCaseSensitive: Boolean = false): String = { + val desc = new SubstringSearchOpDesc + desc.attribute = "body" + desc.substring = substring + desc.isCaseSensitive = isCaseSensitive + objectMapper.writeValueAsString(desc) + } + + // --------------------------------------------------------------------------- + // Substring detection — match / no-match + // --------------------------------------------------------------------------- + + "SubstringSearchOpExec" should "yield the input tuple when the substring is present" in { + val exec = new SubstringSearchOpExec(descJson(substring = "hello")) + val t = tuple("hello world") + assert(exec.processTuple(t, port = 0).toList == List(t)) + } + + it should "yield nothing when the substring is absent" in { + val exec = new SubstringSearchOpExec(descJson(substring = "missing")) + assert(exec.processTuple(tuple("hello world"), port = 0).toList.isEmpty) + } + + it should + "match when the substring sits anywhere in the value (start / middle / end)" in { + val exec = new SubstringSearchOpExec(descJson(substring = "abc")) + assert(exec.processTuple(tuple("abc xx"), port = 0).toList.nonEmpty) + assert(exec.processTuple(tuple("xx abc xx"), port = 0).toList.nonEmpty) + assert(exec.processTuple(tuple("xx abc"), port = 0).toList.nonEmpty) + } + + // --------------------------------------------------------------------------- + // Case sensitivity + // --------------------------------------------------------------------------- + + "SubstringSearchOpExec with isCaseSensitive = true" should + "match case-sensitively (case mismatch is rejected)" in { + val exec = new SubstringSearchOpExec(descJson(substring = "HELLO", isCaseSensitive = true)) + assert(exec.processTuple(tuple("hello world"), port = 0).toList.isEmpty) + } + + it should "yield the tuple when the case matches under case-sensitive mode" in { + val exec = new SubstringSearchOpExec(descJson(substring = "HELLO", isCaseSensitive = true)) + val t = tuple("Say HELLO loudly") + assert(exec.processTuple(t, port = 0).toList == List(t)) + } + + "SubstringSearchOpExec with isCaseSensitive = false" should + "match case-insensitively (production lowercases both sides before String.contains)" in { + val exec = new SubstringSearchOpExec(descJson(substring = "HELLO", isCaseSensitive = false)) + val t = tuple("hello world") + assert(exec.processTuple(t, port = 0).toList == List(t)) + } + + it should "still match identical-case values under case-insensitive mode" in { + val exec = new SubstringSearchOpExec(descJson(substring = "world", isCaseSensitive = false)) + val t = tuple("hello world") + assert(exec.processTuple(t, port = 0).toList == List(t)) + } + + // --------------------------------------------------------------------------- + // Edge: empty substring + // --------------------------------------------------------------------------- + + it should "treat the empty substring as matching every value (Java String.contains(\"\") == true)" in { + val exec = new SubstringSearchOpExec(descJson(substring = "")) + val t = tuple("any non-empty text") + assert(exec.processTuple(t, port = 0).toList == List(t)) + // Even an empty value contains the empty substring. + val empty = tuple("") + assert(exec.processTuple(empty, port = 0).toList == List(empty)) + } + + // --------------------------------------------------------------------------- + // Repeated invocations — predicate stays stable + // --------------------------------------------------------------------------- + + it should "produce stable results across repeated processTuple calls" in { + val exec = new SubstringSearchOpExec(descJson(substring = "match")) + val hit = tuple("match here") + val miss = tuple("no signal") + assert(exec.processTuple(hit, port = 0).toList == List(hit)) + assert(exec.processTuple(miss, port = 0).toList.isEmpty) + assert(exec.processTuple(hit, port = 0).toList == List(hit)) + } + + // --------------------------------------------------------------------------- + // Descriptor parse failure + // --------------------------------------------------------------------------- + + "SubstringSearchOpExec construction" should + "throw on malformed descriptor JSON" in { + intercept[com.fasterxml.jackson.core.JsonProcessingException] { + new SubstringSearchOpExec("{not valid") + } + } +}
