Ok, I stand corrected. I did the experiment I was suggesting, and I got the
desired results only with the API you have submitted:
```java
<K1, V1> KStreamImpl<K1, V1> flatTransform(final TransformerSupplier<? super K,
? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>>
transformerSupplier,
String... storeNames) {
return null;
}
public static class KV<K, V> extends KeyValue<K, V> {
public KV(final K key, final V value) {
super(key, value);
}
}
public static void main(String[] args) {
final KStreamImpl<Integer, Long> stream = new KStreamImpl<>(null, null,
null, false, null);
// exact transformer
final KStreamImpl<Integer, Long> stream2 = stream.flatTransform(
new TransformerSupplier<Integer, Long, Iterable<KeyValue<Integer,
Long>>>() {
@Override
public Transformer<Integer, Long, Iterable<KeyValue<Integer,
Long>>> get() {
return new Transformer<Integer, Long,
Iterable<KeyValue<Integer, Long>>>() {
@Override
public void init(final ProcessorContext context) {}
@Override
public Iterable<KeyValue<Integer, Long>>
transform(final Integer key, final Long value) {
return Arrays.asList(new KV<>(key, value), new
KeyValue<>(key, value));
}
@Override
public void close() {}
};
}
}
);
// transformer that takes superclass k/v and returns exact results
final KStreamImpl<Integer, Long> stream3 = stream.flatTransform(
new TransformerSupplier<Number, Number, Iterable<KeyValue<Integer,
Long>>>() {
@Override
public Transformer<Number, Number, Iterable<KeyValue<Integer,
Long>>> get() {
return new Transformer<Number, Number,
Iterable<KeyValue<Integer, Long>>>() {
@Override
public void init(final ProcessorContext context) {
}
@Override
public Iterable<KeyValue<Integer, Long>>
transform(final Number key, final Number value) {
return Arrays.asList(new KV<>(key.intValue(),
value.longValue()), new KeyValue<>(1, 3L));
}
@Override
public void close() {
}
};
}
}
);
// transformer that takes exact parameters and returns subclass results
final KStreamImpl<Number, Number> stream4 = stream.flatTransform(
new TransformerSupplier<Integer, Long, Iterable<KeyValue<Integer,
Long>>>() {
@Override
public Transformer<Integer, Long, Iterable<KeyValue<Integer,
Long>>> get() {
return new Transformer<Integer, Long,
Iterable<KeyValue<Integer, Long>>>() {
@Override
public void init(final ProcessorContext context) {
}
@Override
public Iterable<KeyValue<Integer, Long>>
transform(final Integer key, final Long value) {
return Arrays.asList(new KV<>(key, value), new
KeyValue<>(1, 3L));
}
@Override
public void close() {
}
};
}
}
);
// transformer that takes superclass parameters and returns subclass
results
final KStreamImpl<Number, Number> stream5 = stream.flatTransform(
new TransformerSupplier<Number, Number, Iterable<KeyValue<Integer,
Long>>>() {
@Override
public Transformer<Number, Number, Iterable<KeyValue<Integer,
Long>>> get() {
return new Transformer<Number, Number,
Iterable<KeyValue<Integer, Long>>>() {
@Override
public void init(final ProcessorContext context) {
}
@Override
public Iterable<KeyValue<Integer, Long>>
transform(final Number key, final Number value) {
return Arrays.asList(new KV<>(key.intValue(),
value.longValue()), new KeyValue<>(1, 3L));
}
@Override
public void close() {
}
};
}
}
);
}
```
If you take out any of the `? extends`, it won't compile.
[ Full content available at: https://github.com/apache/kafka/pull/5273 ]
This message was relayed via gitbox.apache.org for [email protected]