[
https://issues.apache.org/jira/browse/FLINK-19739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu updated FLINK-19739:
----------------------------
Priority: Major (was: Critical)
> CompileException when windowing in batch mode: A method named "replace" is
> not declared in any enclosing class nor any supertype
> ---------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-19739
> URL: https://issues.apache.org/jira/browse/FLINK-19739
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API, Table SQL / Planner
> Affects Versions: 1.12.0, 1.11.2
> Environment: Ubuntu 18.04
> Python 3.8, jar built from master yesterday.
> Or Python 3.7, installed latest version from pip.
> Reporter: Alex Hall
> Priority: Major
> Fix For: 1.12.0
>
>
> Example script:
> {code:python}
> from pyflink.table import EnvironmentSettings, BatchTableEnvironment
> from pyflink.table.window import Tumble
> env_settings = (
>
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
> )
> table_env = BatchTableEnvironment.create(environment_settings=env_settings)
> table_env.execute_sql(
> """
> CREATE TABLE table1 (
> amount INT,
> ts TIMESTAMP(3),
> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) WITH (
> 'connector.type' = 'filesystem',
> 'format.type' = 'csv',
> 'connector.path' = '/home/alex/work/test-flink/data1.csv'
> )
> """
> )
> table1 = table_env.from_path("table1")
> table = (
> table1
> .window(Tumble.over("5.days").on("ts").alias("__window"))
> .group_by("__window")
> .select("amount.sum")
> )
> print(table.to_pandas())
> {code}
> Output:
> {code}
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil
> (file:/home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar)
> to constructor java.nio.DirectByteBuffer(long,int)
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> /* 1 */
> /* 2 */ public class LocalHashWinAggWithoutKeys$59 extends
> org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */ implements
> org.apache.flink.streaming.api.operators.OneInputStreamOperator,
> org.apache.flink.streaming.api.operators.BoundedOneInput {
> /* 4 */
> /* 5 */ private final Object[] references;
> /* 6 */
> /* 7 */ private static final org.slf4j.Logger LOG$2 =
> /* 8 */ org.slf4j.LoggerFactory.getLogger("LocalHashWinAgg");
> /* 9 */
> /* 10 */ private transient
> org.apache.flink.table.types.logical.LogicalType[] aggMapKeyTypes$5;
> /* 11 */ private transient
> org.apache.flink.table.types.logical.LogicalType[] aggBufferTypes$6;
> /* 12 */ private transient
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap
> aggregateMap$7;
> /* 13 */ org.apache.flink.table.data.binary.BinaryRowData
> emptyAggBuffer$9 = new org.apache.flink.table.data.binary.BinaryRowData(1);
> /* 14 */ org.apache.flink.table.data.writer.BinaryRowWriter
> emptyAggBufferWriterTerm$10 = new
> org.apache.flink.table.data.writer.BinaryRowWriter(emptyAggBuffer$9);
> /* 15 */ org.apache.flink.table.data.GenericRowData hashAggOutput =
> new org.apache.flink.table.data.GenericRowData(2);
> /* 16 */ private transient
> org.apache.flink.table.data.binary.BinaryRowData reuseAggMapKey$17 = new
> org.apache.flink.table.data.binary.BinaryRowData(1);
> /* 17 */ private transient
> org.apache.flink.table.data.binary.BinaryRowData reuseAggBuffer$18 = new
> org.apache.flink.table.data.binary.BinaryRowData(1);
> /* 18 */ private transient
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry
> reuseAggMapEntry$19 = new
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry(reuseAggMapKey$17,
> reuseAggBuffer$18);
> /* 19 */ org.apache.flink.table.data.binary.BinaryRowData aggMapKey$3
> = new org.apache.flink.table.data.binary.BinaryRowData(1);
> /* 20 */ org.apache.flink.table.data.writer.BinaryRowWriter
> aggMapKeyWriter$4 = new
> org.apache.flink.table.data.writer.BinaryRowWriter(aggMapKey$3);
> /* 21 */ private boolean hasInput = false;
> /* 22 */ org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> element = new
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord((Object)null);
> /* 23 */ private final
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 24 */
> /* 25 */ public LocalHashWinAggWithoutKeys$59(
> /* 26 */ Object[] references,
> /* 27 */ org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 28 */ org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 29 */ org.apache.flink.streaming.api.operators.Output output,
> /* 30 */
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
> processingTimeService) throws Exception {
> /* 31 */ this.references = references;
> /* 32 */ aggMapKeyTypes$5 =
> (((org.apache.flink.table.types.logical.LogicalType[]) references[0]));
> /* 33 */ aggBufferTypes$6 =
> (((org.apache.flink.table.types.logical.LogicalType[]) references[1]));
> /* 34 */ this.setup(task, config, output);
> /* 35 */ if (this instanceof
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 36 */
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 37 */ .setProcessingTimeService(processingTimeService);
> /* 38 */ }
> /* 39 */ }
> /* 40 */
> /* 41 */ @Override
> /* 42 */ public void open() throws Exception {
> /* 43 */ super.open();
> /* 44 */ aggregateMap$7 = new
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap(this.getContainingTask(),this.getContainingTask().getEnvironment().getMemoryManager(),computeMemorySize(),
> aggMapKeyTypes$5, aggBufferTypes$6);
> /* 45 */
> /* 46 */
> /* 47 */ emptyAggBufferWriterTerm$10.reset();
> /* 48 */
> /* 49 */
> /* 50 */ if (true) {
> /* 51 */ emptyAggBufferWriterTerm$10.setNullAt(0);
> /* 52 */ } else {
> /* 53 */ emptyAggBufferWriterTerm$10.writeInt(0, ((int) -1));
> /* 54 */ }
> /* 55 */
> /* 56 */ emptyAggBufferWriterTerm$10.complete();
> /* 57 */
> /* 58 */ }
> /* 59 */
> /* 60 */ @Override
> /* 61 */ public void
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> element) throws Exception {
> /* 62 */ org.apache.flink.table.data.RowData in1 =
> (org.apache.flink.table.data.RowData) element.getValue();
> /* 63 */
> /* 64 */ org.apache.flink.table.data.binary.BinaryRowData
> currentAggBuffer$8;
> /* 65 */ int field$11;
> /* 66 */ boolean isNull$11;
> /* 67 */ int field$12;
> /* 68 */ boolean isNull$12;
> /* 69 */ boolean isNull$13;
> /* 70 */ int result$14;
> /* 71 */
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.LookupInfo
> lookupInfo$20;
> /* 72 */ org.apache.flink.table.data.TimestampData field$21;
> /* 73 */ boolean isNull$21;
> /* 74 */ boolean isNull$22;
> /* 75 */ long result$23;
> /* 76 */ boolean isNull$24;
> /* 77 */ long result$25;
> /* 78 */ boolean isNull$26;
> /* 79 */ long result$27;
> /* 80 */ boolean isNull$28;
> /* 81 */ long result$29;
> /* 82 */ boolean isNull$30;
> /* 83 */ long result$31;
> /* 84 */ boolean isNull$32;
> /* 85 */ long result$33;
> /* 86 */ boolean isNull$34;
> /* 87 */ boolean result$35;
> /* 88 */ boolean isNull$36;
> /* 89 */ long result$37;
> /* 90 */ boolean isNull$38;
> /* 91 */ long result$39;
> /* 92 */ boolean isNull$40;
> /* 93 */ long result$41;
> /* 94 */ boolean isNull$42;
> /* 95 */ long result$43;
> /* 96 */ boolean isNull$44;
> /* 97 */ long result$45;
> /* 98 */ boolean isNull$46;
> /* 99 */ long result$47;
> /* 100 */ boolean isNull$48;
> /* 101 */ long result$49;
> /* 102 */ boolean isNull$50;
> /* 103 */ long result$51;
> /* 104 */ boolean isNull$52;
> /* 105 */ long result$53;
> /* 106 */ boolean isNull$55;
> /* 107 */ long result$56;
> /* 108 */ boolean isNull$57;
> /* 109 */ long result$58;
> /* 110 */
> /* 111 */
> /* 112 */ if (!in1.isNullAt(1)) {
> /* 113 */ hasInput = true;
> /* 114 */ // input field access for group key projection,
> window/pane assign
> /* 115 */ // and aggregate map update
> /* 116 */ isNull$11 = in1.isNullAt(0);
> /* 117 */ field$11 = -1;
> /* 118 */ if (!isNull$11) {
> /* 119 */ field$11 = in1.getInt(0);
> /* 120 */ }
> /* 121 */ isNull$21 = in1.isNullAt(1);
> /* 122 */ field$21 = null;
> /* 123 */ if (!isNull$21) {
> /* 124 */ field$21 = in1.getTimestamp(1, 3);
> /* 125 */ }
> /* 126 */ // assign timestamp(window or pane)
> /* 127 */
> /* 128 */
> /* 129 */
> /* 130 */
> /* 131 */
> /* 132 */ isNull$22 = isNull$21;
> /* 133 */ result$23 = -1L;
> /* 134 */ if (!isNull$22) {
> /* 135 */
> /* 136 */ result$23 = field$21.getMillisecond();
> /* 137 */
> /* 138 */ }
> /* 139 */
> /* 140 */
> /* 141 */ isNull$24 = isNull$22 || false;
> /* 142 */ result$25 = -1L;
> /* 143 */ if (!isNull$24) {
> /* 144 */
> /* 145 */ result$25 = (long) (result$23 * ((long) 1L));
> /* 146 */
> /* 147 */ }
> /* 148 */
> /* 149 */ isNull$26 = isNull$21;
> /* 150 */ result$27 = -1L;
> /* 151 */ if (!isNull$26) {
> /* 152 */
> /* 153 */ result$27 = field$21.getMillisecond();
> /* 154 */
> /* 155 */ }
> /* 156 */
> /* 157 */
> /* 158 */ isNull$28 = isNull$26 || false;
> /* 159 */ result$29 = -1L;
> /* 160 */ if (!isNull$28) {
> /* 161 */
> /* 162 */ result$29 = (long) (result$27 * ((long) 1L));
> /* 163 */
> /* 164 */ }
> /* 165 */
> /* 166 */
> /* 167 */ isNull$30 = isNull$28 || false;
> /* 168 */ result$31 = -1L;
> /* 169 */ if (!isNull$30) {
> /* 170 */
> /* 171 */ result$31 = (long) (result$29 - ((long) 0L));
> /* 172 */
> /* 173 */ }
> /* 174 */
> /* 175 */
> /* 176 */ isNull$32 = isNull$30 || false;
> /* 177 */ result$33 = -1L;
> /* 178 */ if (!isNull$32) {
> /* 179 */
> /* 180 */ result$33 = (long) (result$31 % ((long) 432000000L));
> /* 181 */
> /* 182 */ }
> /* 183 */
> /* 184 */
> /* 185 */ isNull$34 = isNull$32 || false;
> /* 186 */ result$35 = false;
> /* 187 */ if (!isNull$34) {
> /* 188 */
> /* 189 */ result$35 = result$33 < ((int) 0);
> /* 190 */
> /* 191 */ }
> /* 192 */
> /* 193 */ long result$54 = -1L;
> /* 194 */ boolean isNull$54;
> /* 195 */ if (result$35) {
> /* 196 */
> /* 197 */
> /* 198 */
> /* 199 */
> /* 200 */
> /* 201 */
> /* 202 */ isNull$36 = isNull$21;
> /* 203 */ result$37 = -1L;
> /* 204 */ if (!isNull$36) {
> /* 205 */
> /* 206 */ result$37 = field$21.getMillisecond();
> /* 207 */
> /* 208 */ }
> /* 209 */
> /* 210 */
> /* 211 */ isNull$38 = isNull$36 || false;
> /* 212 */ result$39 = -1L;
> /* 213 */ if (!isNull$38) {
> /* 214 */
> /* 215 */ result$39 = (long) (result$37 * ((long) 1L));
> /* 216 */
> /* 217 */ }
> /* 218 */
> /* 219 */
> /* 220 */ isNull$40 = isNull$38 || false;
> /* 221 */ result$41 = -1L;
> /* 222 */ if (!isNull$40) {
> /* 223 */
> /* 224 */ result$41 = (long) (result$39 - ((long) 0L));
> /* 225 */
> /* 226 */ }
> /* 227 */
> /* 228 */
> /* 229 */ isNull$42 = isNull$40 || false;
> /* 230 */ result$43 = -1L;
> /* 231 */ if (!isNull$42) {
> /* 232 */
> /* 233 */ result$43 = (long) (result$41 % ((long) 432000000L));
> /* 234 */
> /* 235 */ }
> /* 236 */
> /* 237 */
> /* 238 */ isNull$44 = isNull$42 || false;
> /* 239 */ result$45 = -1L;
> /* 240 */ if (!isNull$44) {
> /* 241 */
> /* 242 */ result$45 = (long) (result$43 + ((long) 432000000L));
> /* 243 */
> /* 244 */ }
> /* 245 */
> /* 246 */ isNull$54 = isNull$44;
> /* 247 */ if (!isNull$54) {
> /* 248 */ result$54 = result$45;
> /* 249 */ }
> /* 250 */ }
> /* 251 */ else {
> /* 252 */
> /* 253 */
> /* 254 */
> /* 255 */
> /* 256 */
> /* 257 */ isNull$46 = isNull$21;
> /* 258 */ result$47 = -1L;
> /* 259 */ if (!isNull$46) {
> /* 260 */
> /* 261 */ result$47 = field$21.getMillisecond();
> /* 262 */
> /* 263 */ }
> /* 264 */
> /* 265 */
> /* 266 */ isNull$48 = isNull$46 || false;
> /* 267 */ result$49 = -1L;
> /* 268 */ if (!isNull$48) {
> /* 269 */
> /* 270 */ result$49 = (long) (result$47 * ((long) 1L));
> /* 271 */
> /* 272 */ }
> /* 273 */
> /* 274 */
> /* 275 */ isNull$50 = isNull$48 || false;
> /* 276 */ result$51 = -1L;
> /* 277 */ if (!isNull$50) {
> /* 278 */
> /* 279 */ result$51 = (long) (result$49 - ((long) 0L));
> /* 280 */
> /* 281 */ }
> /* 282 */
> /* 283 */
> /* 284 */ isNull$52 = isNull$50 || false;
> /* 285 */ result$53 = -1L;
> /* 286 */ if (!isNull$52) {
> /* 287 */
> /* 288 */ result$53 = (long) (result$51 % ((long) 432000000L));
> /* 289 */
> /* 290 */ }
> /* 291 */
> /* 292 */ isNull$54 = isNull$52;
> /* 293 */ if (!isNull$54) {
> /* 294 */ result$54 = result$53;
> /* 295 */ }
> /* 296 */ }
> /* 297 */ isNull$55 = isNull$24 || isNull$54;
> /* 298 */ result$56 = -1L;
> /* 299 */ if (!isNull$55) {
> /* 300 */
> /* 301 */ result$56 = (long) (result$25 - result$54);
> /* 302 */
> /* 303 */ }
> /* 304 */
> /* 305 */
> /* 306 */ isNull$57 = isNull$55 || false;
> /* 307 */ result$58 = -1L;
> /* 308 */ if (!isNull$57) {
> /* 309 */
> /* 310 */ result$58 = (long) (result$56 - ((long) 0L));
> /* 311 */
> /* 312 */ }
> /* 313 */
> /* 314 */ // process each input
> /* 315 */
> /* 316 */ // build aggregate map key
> /* 317 */
> /* 318 */
> /* 319 */ aggMapKeyWriter$4.reset();
> /* 320 */
> /* 321 */
> /* 322 */ if (false) {
> /* 323 */ aggMapKeyWriter$4.setNullAt(0);
> /* 324 */ } else {
> /* 325 */ aggMapKeyWriter$4.writeLong(0, result$58);
> /* 326 */ }
> /* 327 */
> /* 328 */ aggMapKeyWriter$4.complete();
> /* 329 */
> /* 330 */ // aggregate by each input with assigned timestamp
> /* 331 */ // look up output buffer using current key (grouping
> keys ..., assigned timestamp)
> /* 332 */ lookupInfo$20 = aggregateMap$7.lookup(aggMapKey$3);
> /* 333 */ currentAggBuffer$8 = lookupInfo$20.getValue();
> /* 334 */ if (!lookupInfo$20.isFound()) {
> /* 335 */
> /* 336 */ // append empty agg buffer into aggregate map for
> current group key
> /* 337 */ try {
> /* 338 */ currentAggBuffer$8 =
> /* 339 */ aggregateMap$7.append(lookupInfo$20,
> emptyAggBuffer$9);
> /* 340 */ } catch (java.io.EOFException exp) {
> /* 341 */
> /* 342 */ LOG$2.info("BytesHashMap out of memory with {} entries,
> output directly.", aggregateMap$7.getNumElements());
> /* 343 */ // hash map out of memory, output directly
> /* 344 */
> /* 345 */
> org.apache.flink.util.MutableObjectIterator<org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry>
> iterator =
> /* 346 */ aggregateMap$7.getEntryIterator();
> /* 347 */ while (iterator.next(reuseAggMapEntry$19) != null) {
> /* 348 */
> /* 349 */
> /* 350 */
> /* 351 */ hashAggOutput.replace(reuseAggMapKey$17,
> reuseAggBuffer$18);
> /* 352 */
> /* 353 */ output.collect(outElement.replace(hashAggOutput));
> /* 354 */ }
> /* 355 */
> /* 356 */ // retry append
> /* 357 */
> /* 358 */ // reset aggregate map retry append
> /* 359 */ aggregateMap$7.reset();
> /* 360 */ lookupInfo$20 = aggregateMap$7.lookup(aggMapKey$3);
> /* 361 */ try {
> /* 362 */ currentAggBuffer$8 =
> /* 363 */ aggregateMap$7.append(lookupInfo$20, emptyAggBuffer$9);
> /* 364 */ } catch (java.io.EOFException e) {
> /* 365 */ throw new OutOfMemoryError("BytesHashMap Out of
> Memory.");
> /* 366 */ }
> /* 367 */
> /* 368 */
> /* 369 */ }
> /* 370 */ }
> /* 371 */ // aggregate buffer fields access
> /* 372 */ isNull$12 = currentAggBuffer$8.isNullAt(0);
> /* 373 */ field$12 = -1;
> /* 374 */ if (!isNull$12) {
> /* 375 */ field$12 = currentAggBuffer$8.getInt(0);
> /* 376 */ }
> /* 377 */ // do aggregate and update agg buffer
> /* 378 */ int result$16 = -1;
> /* 379 */ boolean isNull$16;
> /* 380 */ if (isNull$11) {
> /* 381 */
> /* 382 */ isNull$16 = isNull$12;
> /* 383 */ if (!isNull$16) {
> /* 384 */ result$16 = field$12;
> /* 385 */ }
> /* 386 */ }
> /* 387 */ else {
> /* 388 */ int result$15 = -1;
> /* 389 */ boolean isNull$15;
> /* 390 */ if (isNull$12) {
> /* 391 */
> /* 392 */ isNull$15 = isNull$11;
> /* 393 */ if (!isNull$15) {
> /* 394 */ result$15 = field$11;
> /* 395 */ }
> /* 396 */ }
> /* 397 */ else {
> /* 398 */
> /* 399 */
> /* 400 */
> /* 401 */ isNull$13 = isNull$12 || isNull$11;
> /* 402 */ result$14 = -1;
> /* 403 */ if (!isNull$13) {
> /* 404 */
> /* 405 */ result$14 = (int) (field$12 + field$11);
> /* 406 */
> /* 407 */ }
> /* 408 */
> /* 409 */ isNull$15 = isNull$13;
> /* 410 */ if (!isNull$15) {
> /* 411 */ result$15 = result$14;
> /* 412 */ }
> /* 413 */ }
> /* 414 */ isNull$16 = isNull$15;
> /* 415 */ if (!isNull$16) {
> /* 416 */ result$16 = result$15;
> /* 417 */ }
> /* 418 */ }
> /* 419 */ if (isNull$16) {
> /* 420 */ currentAggBuffer$8.setNullAt(0);
> /* 421 */ } else {
> /* 422 */ currentAggBuffer$8.setInt(0, result$16);
> /* 423 */ }
> /* 424 */
> /* 425 */ }
> /* 426 */ }
> /* 427 */
> /* 428 */
> /* 429 */ @Override
> /* 430 */ public void endInput() throws Exception {
> /* 431 */ org.apache.flink.table.data.binary.BinaryRowData
> currentAggBuffer$8;
> /* 432 */ int field$11;
> /* 433 */ boolean isNull$11;
> /* 434 */ int field$12;
> /* 435 */ boolean isNull$12;
> /* 436 */ boolean isNull$13;
> /* 437 */ int result$14;
> /* 438 */
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.LookupInfo
> lookupInfo$20;
> /* 439 */ org.apache.flink.table.data.TimestampData field$21;
> /* 440 */ boolean isNull$21;
> /* 441 */ boolean isNull$22;
> /* 442 */ long result$23;
> /* 443 */ boolean isNull$24;
> /* 444 */ long result$25;
> /* 445 */ boolean isNull$26;
> /* 446 */ long result$27;
> /* 447 */ boolean isNull$28;
> /* 448 */ long result$29;
> /* 449 */ boolean isNull$30;
> /* 450 */ long result$31;
> /* 451 */ boolean isNull$32;
> /* 452 */ long result$33;
> /* 453 */ boolean isNull$34;
> /* 454 */ boolean result$35;
> /* 455 */ boolean isNull$36;
> /* 456 */ long result$37;
> /* 457 */ boolean isNull$38;
> /* 458 */ long result$39;
> /* 459 */ boolean isNull$40;
> /* 460 */ long result$41;
> /* 461 */ boolean isNull$42;
> /* 462 */ long result$43;
> /* 463 */ boolean isNull$44;
> /* 464 */ long result$45;
> /* 465 */ boolean isNull$46;
> /* 466 */ long result$47;
> /* 467 */ boolean isNull$48;
> /* 468 */ long result$49;
> /* 469 */ boolean isNull$50;
> /* 470 */ long result$51;
> /* 471 */ boolean isNull$52;
> /* 472 */ long result$53;
> /* 473 */ boolean isNull$55;
> /* 474 */ long result$56;
> /* 475 */ boolean isNull$57;
> /* 476 */ long result$58;
> /* 477 */
> /* 478 */
> org.apache.flink.util.MutableObjectIterator<org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry>
> iterator =
> /* 479 */ aggregateMap$7.getEntryIterator();
> /* 480 */ while (iterator.next(reuseAggMapEntry$19) != null) {
> /* 481 */
> /* 482 */
> /* 483 */
> /* 484 */ hashAggOutput.replace(reuseAggMapKey$17, reuseAggBuffer$18);
> /* 485 */
> /* 486 */ output.collect(outElement.replace(hashAggOutput));
> /* 487 */ }
> /* 488 */
> /* 489 */ }
> /* 490 */
> /* 491 */
> /* 492 */ @Override
> /* 493 */ public void close() throws Exception {
> /* 494 */ super.close();
> /* 495 */ aggregateMap$7.free();
> /* 496 */
> /* 497 */ }
> /* 498 */
> /* 499 */
> /* 500 */ }
> /* 501 */
> Traceback (most recent call last):
> File "/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_903.py",
> line 32, in <module>
> print(table.to_pandas())
> File "/home/alex/work/flink/flink-python/pyflink/table/table.py", line 829,
> in to_pandas
> if batches.hasNext():
> File
> "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py",
> line 1285, in __call__
> return_value = get_return_value(
> File "/home/alex/work/flink/flink-python/pyflink/util/exceptions.py", line
> 147, in deco
> return f(*a, **kw)
> File
> "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/protocol.py",
> line 326, in get_return_value
> raise Py4JJavaError(
> py4j.protocol.Py4JJavaError: An error occurred while calling o51.hasNext.
> : java.lang.RuntimeException: Failed to fetch next result
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
> at
> org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115)
> at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
> at
> org.apache.flink.table.runtime.arrow.ArrowUtils$1.hasNext(ArrowUtils.java:644)
> at
> org.apache.flink.table.runtime.arrow.ArrowUtils$2.hasNext(ArrowUtils.java:666)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.io.IOException: Failed to fetch job execution result
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103)
> ... 16 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> at
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:172)
> ... 18 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
> at
> java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
> at
> java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
> at
> java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
> ... 19 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:217)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:210)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:204)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:526)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:413)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not instantiate generated class
> 'LocalHashWinAggWithoutKeys$59'
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
> at
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:613)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:574)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be
> compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
> ... 13 more
> Caused by:
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be
> compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> ... 15 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program
> cannot be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> ... 18 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 351, Column
> 33: A method named "replace" is not declared in any enclosing class nor any
> supertype, nor through a static import
> at
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
> at
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> at
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
> at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$Block.accept(Java.java:2776)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1842)
> at org.codehaus.janino.UnitCompiler.access$2200(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitWhileStatement(UnitCompiler.java:1498)
> at
> org.codehaus.janino.UnitCompiler$6.visitWhileStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$WhileStatement.accept(Java.java:3052)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
> at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$Block.accept(Java.java:2776)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileTryCatch(UnitCompiler.java:3136)
> at
> org.codehaus.janino.UnitCompiler.compileTryCatchFinally(UnitCompiler.java:2966)
> at
> org.codehaus.janino.UnitCompiler.compileTryCatchFinallyWithResources(UnitCompiler.java:2770)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2742)
> at org.codehaus.janino.UnitCompiler.access$2300(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitTryStatement(UnitCompiler.java:1499)
> at
> org.codehaus.janino.UnitCompiler$6.visitTryStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$TryStatement.accept(Java.java:3238)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
> at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$Block.accept(Java.java:2776)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2476)
> at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
> at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$Block.accept(Java.java:2776)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2476)
> at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> ... 24 more
> {code}
> However it works fine in streaming mode:
> {code:python}
> env_settings = (
>
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> )
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
> {code}
> How the table is created seems irrelevant - this raises the same error:
> {code:python}
> from datetime import datetime
> from pyflink.table import DataTypes, BatchTableEnvironment,
> EnvironmentSettings
> from pyflink.table.window import Tumble
> env_settings = (
>
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
> )
> table_environment =
> BatchTableEnvironment.create(environment_settings=env_settings)
> transactions = table_environment.from_elements(
> [
> (1, datetime(2000, 1, 1, 0, 0, 0)),
> (-2, datetime(2000, 1, 2, 0, 0, 0)),
> (3, datetime(2000, 1, 3, 0, 0, 0)),
> (-4, datetime(2000, 1, 4, 0, 0, 0)),
> ],
> DataTypes.ROW(
> [
> DataTypes.FIELD("amount", DataTypes.BIGINT()),
> DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
> ]
> ),
> )
> table = (
> transactions
> .window(Tumble.over("5.days").on("ts").alias("__window"))
> .group_by("__window")
> .select("amount.sum")
> )
> print(table.to_pandas())
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)