[
https://issues.apache.org/jira/browse/FLINK-19739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17324112#comment-17324112
]
Flink Jira Bot commented on FLINK-19739:
----------------------------------------
This issue is assigned but has not received an update in 7 days so it has been
labeled "stale-assigned". If you are still working on the issue, please give an
update and remove the label. If you are no longer working on the issue, please
unassign so someone else may work on it. In 7 days the issue will be
automatically unassigned.
> CompileException when windowing in batch mode: A method named "replace" is
> not declared in any enclosing class nor any supertype
> ---------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-19739
> URL: https://issues.apache.org/jira/browse/FLINK-19739
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.11.2, 1.12.0
> Environment: Ubuntu 18.04
> Python 3.8, jar built from master yesterday.
> Or Python 3.7, installed latest version from pip.
> Reporter: Alex Hall
> Assignee: Jingsong Lee
> Priority: Major
> Labels: stale-assigned
>
> Example script:
> {code:python}
> from pyflink.table import EnvironmentSettings, BatchTableEnvironment
> from pyflink.table.window import Tumble
> env_settings = (
>
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
> )
> table_env = BatchTableEnvironment.create(environment_settings=env_settings)
> table_env.execute_sql(
> """
> CREATE TABLE table1 (
> amount INT,
> ts TIMESTAMP(3),
> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) WITH (
> 'connector.type' = 'filesystem',
> 'format.type' = 'csv',
> 'connector.path' = '/home/alex/work/test-flink/data1.csv'
> )
> """
> )
> table1 = table_env.from_path("table1")
> table = (
> table1
> .window(Tumble.over("5.days").on("ts").alias("__window"))
> .group_by("__window")
> .select("amount.sum")
> )
> print(table.to_pandas())
> {code}
> Output:
> {code}
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil
> (file:/home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar)
> to constructor java.nio.DirectByteBuffer(long,int)
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> /* 1 */
> /* 2 */ public class LocalHashWinAggWithoutKeys$59 extends
> org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */ implements
> org.apache.flink.streaming.api.operators.OneInputStreamOperator,
> org.apache.flink.streaming.api.operators.BoundedOneInput {
> /* 4 */
> /* 5 */ private final Object[] references;
> /* 6 */
> /* 7 */ private static final org.slf4j.Logger LOG$2 =
> /* 8 */ org.slf4j.LoggerFactory.getLogger("LocalHashWinAgg");
> /* 9 */
> /* 10 */ private transient
> org.apache.flink.table.types.logical.LogicalType[] aggMapKeyTypes$5;
> /* 11 */ private transient
> org.apache.flink.table.types.logical.LogicalType[] aggBufferTypes$6;
> /* 12 */ private transient
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap
> aggregateMap$7;
> /* 13 */ org.apache.flink.table.data.binary.BinaryRowData
> emptyAggBuffer$9 = new org.apache.flink.table.data.binary.BinaryRowData(1);
> /* 14 */ org.apache.flink.table.data.writer.BinaryRowWriter
> emptyAggBufferWriterTerm$10 = new
> org.apache.flink.table.data.writer.BinaryRowWriter(emptyAggBuffer$9);
> /* 15 */ org.apache.flink.table.data.GenericRowData hashAggOutput =
> new org.apache.flink.table.data.GenericRowData(2);
> /* 16 */ private transient
> org.apache.flink.table.data.binary.BinaryRowData reuseAggMapKey$17 = new
> org.apache.flink.table.data.binary.BinaryRowData(1);
> /* 17 */ private transient
> org.apache.flink.table.data.binary.BinaryRowData reuseAggBuffer$18 = new
> org.apache.flink.table.data.binary.BinaryRowData(1);
> /* 18 */ private transient
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry
> reuseAggMapEntry$19 = new
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry(reuseAggMapKey$17,
> reuseAggBuffer$18);
> /* 19 */ org.apache.flink.table.data.binary.BinaryRowData aggMapKey$3
> = new org.apache.flink.table.data.binary.BinaryRowData(1);
> /* 20 */ org.apache.flink.table.data.writer.BinaryRowWriter
> aggMapKeyWriter$4 = new
> org.apache.flink.table.data.writer.BinaryRowWriter(aggMapKey$3);
> /* 21 */ private boolean hasInput = false;
> /* 22 */ org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> element = new
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord((Object)null);
> /* 23 */ private final
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 24 */
> /* 25 */ public LocalHashWinAggWithoutKeys$59(
> /* 26 */ Object[] references,
> /* 27 */ org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 28 */ org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 29 */ org.apache.flink.streaming.api.operators.Output output,
> /* 30 */
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
> processingTimeService) throws Exception {
> /* 31 */ this.references = references;
> /* 32 */ aggMapKeyTypes$5 =
> (((org.apache.flink.table.types.logical.LogicalType[]) references[0]));
> /* 33 */ aggBufferTypes$6 =
> (((org.apache.flink.table.types.logical.LogicalType[]) references[1]));
> /* 34 */ this.setup(task, config, output);
> /* 35 */ if (this instanceof
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 36 */
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 37 */ .setProcessingTimeService(processingTimeService);
> /* 38 */ }
> /* 39 */ }
> /* 40 */
> /* 41 */ @Override
> /* 42 */ public void open() throws Exception {
> /* 43 */ super.open();
> /* 44 */ aggregateMap$7 = new
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap(this.getContainingTask(),this.getContainingTask().getEnvironment().getMemoryManager(),computeMemorySize(),
> aggMapKeyTypes$5, aggBufferTypes$6);
> /* 45 */
> /* 46 */
> /* 47 */ emptyAggBufferWriterTerm$10.reset();
> /* 48 */
> /* 49 */
> /* 50 */ if (true) {
> /* 51 */ emptyAggBufferWriterTerm$10.setNullAt(0);
> /* 52 */ } else {
> /* 53 */ emptyAggBufferWriterTerm$10.writeInt(0, ((int) -1));
> /* 54 */ }
> /* 55 */
> /* 56 */ emptyAggBufferWriterTerm$10.complete();
> /* 57 */
> /* 58 */ }
> /* 59 */
> /* 60 */ @Override
> /* 61 */ public void
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> element) throws Exception {
> /* 62 */ org.apache.flink.table.data.RowData in1 =
> (org.apache.flink.table.data.RowData) element.getValue();
> /* 63 */
> /* 64 */ org.apache.flink.table.data.binary.BinaryRowData
> currentAggBuffer$8;
> /* 65 */ int field$11;
> /* 66 */ boolean isNull$11;
> /* 67 */ int field$12;
> /* 68 */ boolean isNull$12;
> /* 69 */ boolean isNull$13;
> /* 70 */ int result$14;
> /* 71 */
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.LookupInfo
> lookupInfo$20;
> /* 72 */ org.apache.flink.table.data.TimestampData field$21;
> /* 73 */ boolean isNull$21;
> /* 74 */ boolean isNull$22;
> /* 75 */ long result$23;
> /* 76 */ boolean isNull$24;
> /* 77 */ long result$25;
> /* 78 */ boolean isNull$26;
> /* 79 */ long result$27;
> /* 80 */ boolean isNull$28;
> /* 81 */ long result$29;
> /* 82 */ boolean isNull$30;
> /* 83 */ long result$31;
> /* 84 */ boolean isNull$32;
> /* 85 */ long result$33;
> /* 86 */ boolean isNull$34;
> /* 87 */ boolean result$35;
> /* 88 */ boolean isNull$36;
> /* 89 */ long result$37;
> /* 90 */ boolean isNull$38;
> /* 91 */ long result$39;
> /* 92 */ boolean isNull$40;
> /* 93 */ long result$41;
> /* 94 */ boolean isNull$42;
> /* 95 */ long result$43;
> /* 96 */ boolean isNull$44;
> /* 97 */ long result$45;
> /* 98 */ boolean isNull$46;
> /* 99 */ long result$47;
> /* 100 */ boolean isNull$48;
> /* 101 */ long result$49;
> /* 102 */ boolean isNull$50;
> /* 103 */ long result$51;
> /* 104 */ boolean isNull$52;
> /* 105 */ long result$53;
> /* 106 */ boolean isNull$55;
> /* 107 */ long result$56;
> /* 108 */ boolean isNull$57;
> /* 109 */ long result$58;
> /* 110 */
> /* 111 */
> /* 112 */ if (!in1.isNullAt(1)) {
> /* 113 */ hasInput = true;
> /* 114 */ // input field access for group key projection,
> window/pane assign
> /* 115 */ // and aggregate map update
> /* 116 */ isNull$11 = in1.isNullAt(0);
> /* 117 */ field$11 = -1;
> /* 118 */ if (!isNull$11) {
> /* 119 */ field$11 = in1.getInt(0);
> /* 120 */ }
> /* 121 */ isNull$21 = in1.isNullAt(1);
> /* 122 */ field$21 = null;
> /* 123 */ if (!isNull$21) {
> /* 124 */ field$21 = in1.getTimestamp(1, 3);
> /* 125 */ }
> /* 126 */ // assign timestamp(window or pane)
> /* 127 */
> /* 128 */
> /* 129 */
> /* 130 */
> /* 131 */
> /* 132 */ isNull$22 = isNull$21;
> /* 133 */ result$23 = -1L;
> /* 134 */ if (!isNull$22) {
> /* 135 */
> /* 136 */ result$23 = field$21.getMillisecond();
> /* 137 */
> /* 138 */ }
> /* 139 */
> /* 140 */
> /* 141 */ isNull$24 = isNull$22 || false;
> /* 142 */ result$25 = -1L;
> /* 143 */ if (!isNull$24) {
> /* 144 */
> /* 145 */ result$25 = (long) (result$23 * ((long) 1L));
> /* 146 */
> /* 147 */ }
> /* 148 */
> /* 149 */ isNull$26 = isNull$21;
> /* 150 */ result$27 = -1L;
> /* 151 */ if (!isNull$26) {
> /* 152 */
> /* 153 */ result$27 = field$21.getMillisecond();
> /* 154 */
> /* 155 */ }
> /* 156 */
> /* 157 */
> /* 158 */ isNull$28 = isNull$26 || false;
> /* 159 */ result$29 = -1L;
> /* 160 */ if (!isNull$28) {
> /* 161 */
> /* 162 */ result$29 = (long) (result$27 * ((long) 1L));
> /* 163 */
> /* 164 */ }
> /* 165 */
> /* 166 */
> /* 167 */ isNull$30 = isNull$28 || false;
> /* 168 */ result$31 = -1L;
> /* 169 */ if (!isNull$30) {
> /* 170 */
> /* 171 */ result$31 = (long) (result$29 - ((long) 0L));
> /* 172 */
> /* 173 */ }
> /* 174 */
> /* 175 */
> /* 176 */ isNull$32 = isNull$30 || false;
> /* 177 */ result$33 = -1L;
> /* 178 */ if (!isNull$32) {
> /* 179 */
> /* 180 */ result$33 = (long) (result$31 % ((long) 432000000L));
> /* 181 */
> /* 182 */ }
> /* 183 */
> /* 184 */
> /* 185 */ isNull$34 = isNull$32 || false;
> /* 186 */ result$35 = false;
> /* 187 */ if (!isNull$34) {
> /* 188 */
> /* 189 */ result$35 = result$33 < ((int) 0);
> /* 190 */
> /* 191 */ }
> /* 192 */
> /* 193 */ long result$54 = -1L;
> /* 194 */ boolean isNull$54;
> /* 195 */ if (result$35) {
> /* 196 */
> /* 197 */
> /* 198 */
> /* 199 */
> /* 200 */
> /* 201 */
> /* 202 */ isNull$36 = isNull$21;
> /* 203 */ result$37 = -1L;
> /* 204 */ if (!isNull$36) {
> /* 205 */
> /* 206 */ result$37 = field$21.getMillisecond();
> /* 207 */
> /* 208 */ }
> /* 209 */
> /* 210 */
> /* 211 */ isNull$38 = isNull$36 || false;
> /* 212 */ result$39 = -1L;
> /* 213 */ if (!isNull$38) {
> /* 214 */
> /* 215 */ result$39 = (long) (result$37 * ((long) 1L));
> /* 216 */
> /* 217 */ }
> /* 218 */
> /* 219 */
> /* 220 */ isNull$40 = isNull$38 || false;
> /* 221 */ result$41 = -1L;
> /* 222 */ if (!isNull$40) {
> /* 223 */
> /* 224 */ result$41 = (long) (result$39 - ((long) 0L));
> /* 225 */
> /* 226 */ }
> /* 227 */
> /* 228 */
> /* 229 */ isNull$42 = isNull$40 || false;
> /* 230 */ result$43 = -1L;
> /* 231 */ if (!isNull$42) {
> /* 232 */
> /* 233 */ result$43 = (long) (result$41 % ((long) 432000000L));
> /* 234 */
> /* 235 */ }
> /* 236 */
> /* 237 */
> /* 238 */ isNull$44 = isNull$42 || false;
> /* 239 */ result$45 = -1L;
> /* 240 */ if (!isNull$44) {
> /* 241 */
> /* 242 */ result$45 = (long) (result$43 + ((long) 432000000L));
> /* 243 */
> /* 244 */ }
> /* 245 */
> /* 246 */ isNull$54 = isNull$44;
> /* 247 */ if (!isNull$54) {
> /* 248 */ result$54 = result$45;
> /* 249 */ }
> /* 250 */ }
> /* 251 */ else {
> /* 252 */
> /* 253 */
> /* 254 */
> /* 255 */
> /* 256 */
> /* 257 */ isNull$46 = isNull$21;
> /* 258 */ result$47 = -1L;
> /* 259 */ if (!isNull$46) {
> /* 260 */
> /* 261 */ result$47 = field$21.getMillisecond();
> /* 262 */
> /* 263 */ }
> /* 264 */
> /* 265 */
> /* 266 */ isNull$48 = isNull$46 || false;
> /* 267 */ result$49 = -1L;
> /* 268 */ if (!isNull$48) {
> /* 269 */
> /* 270 */ result$49 = (long) (result$47 * ((long) 1L));
> /* 271 */
> /* 272 */ }
> /* 273 */
> /* 274 */
> /* 275 */ isNull$50 = isNull$48 || false;
> /* 276 */ result$51 = -1L;
> /* 277 */ if (!isNull$50) {
> /* 278 */
> /* 279 */ result$51 = (long) (result$49 - ((long) 0L));
> /* 280 */
> /* 281 */ }
> /* 282 */
> /* 283 */
> /* 284 */ isNull$52 = isNull$50 || false;
> /* 285 */ result$53 = -1L;
> /* 286 */ if (!isNull$52) {
> /* 287 */
> /* 288 */ result$53 = (long) (result$51 % ((long) 432000000L));
> /* 289 */
> /* 290 */ }
> /* 291 */
> /* 292 */ isNull$54 = isNull$52;
> /* 293 */ if (!isNull$54) {
> /* 294 */ result$54 = result$53;
> /* 295 */ }
> /* 296 */ }
> /* 297 */ isNull$55 = isNull$24 || isNull$54;
> /* 298 */ result$56 = -1L;
> /* 299 */ if (!isNull$55) {
> /* 300 */
> /* 301 */ result$56 = (long) (result$25 - result$54);
> /* 302 */
> /* 303 */ }
> /* 304 */
> /* 305 */
> /* 306 */ isNull$57 = isNull$55 || false;
> /* 307 */ result$58 = -1L;
> /* 308 */ if (!isNull$57) {
> /* 309 */
> /* 310 */ result$58 = (long) (result$56 - ((long) 0L));
> /* 311 */
> /* 312 */ }
> /* 313 */
> /* 314 */ // process each input
> /* 315 */
> /* 316 */ // build aggregate map key
> /* 317 */
> /* 318 */
> /* 319 */ aggMapKeyWriter$4.reset();
> /* 320 */
> /* 321 */
> /* 322 */ if (false) {
> /* 323 */ aggMapKeyWriter$4.setNullAt(0);
> /* 324 */ } else {
> /* 325 */ aggMapKeyWriter$4.writeLong(0, result$58);
> /* 326 */ }
> /* 327 */
> /* 328 */ aggMapKeyWriter$4.complete();
> /* 329 */
> /* 330 */ // aggregate by each input with assigned timestamp
> /* 331 */ // look up output buffer using current key (grouping
> keys ..., assigned timestamp)
> /* 332 */ lookupInfo$20 = aggregateMap$7.lookup(aggMapKey$3);
> /* 333 */ currentAggBuffer$8 = lookupInfo$20.getValue();
> /* 334 */ if (!lookupInfo$20.isFound()) {
> /* 335 */
> /* 336 */ // append empty agg buffer into aggregate map for
> current group key
> /* 337 */ try {
> /* 338 */ currentAggBuffer$8 =
> /* 339 */ aggregateMap$7.append(lookupInfo$20,
> emptyAggBuffer$9);
> /* 340 */ } catch (java.io.EOFException exp) {
> /* 341 */
> /* 342 */ LOG$2.info("BytesHashMap out of memory with {} entries,
> output directly.", aggregateMap$7.getNumElements());
> /* 343 */ // hash map out of memory, output directly
> /* 344 */
> /* 345 */
> org.apache.flink.util.MutableObjectIterator<org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry>
> iterator =
> /* 346 */ aggregateMap$7.getEntryIterator();
> /* 347 */ while (iterator.next(reuseAggMapEntry$19) != null) {
> /* 348 */
> /* 349 */
> /* 350 */
> /* 351 */ hashAggOutput.replace(reuseAggMapKey$17,
> reuseAggBuffer$18);
> /* 352 */
> /* 353 */ output.collect(outElement.replace(hashAggOutput));
> /* 354 */ }
> /* 355 */
> /* 356 */ // retry append
> /* 357 */
> /* 358 */ // reset aggregate map retry append
> /* 359 */ aggregateMap$7.reset();
> /* 360 */ lookupInfo$20 = aggregateMap$7.lookup(aggMapKey$3);
> /* 361 */ try {
> /* 362 */ currentAggBuffer$8 =
> /* 363 */ aggregateMap$7.append(lookupInfo$20, emptyAggBuffer$9);
> /* 364 */ } catch (java.io.EOFException e) {
> /* 365 */ throw new OutOfMemoryError("BytesHashMap Out of
> Memory.");
> /* 366 */ }
> /* 367 */
> /* 368 */
> /* 369 */ }
> /* 370 */ }
> /* 371 */ // aggregate buffer fields access
> /* 372 */ isNull$12 = currentAggBuffer$8.isNullAt(0);
> /* 373 */ field$12 = -1;
> /* 374 */ if (!isNull$12) {
> /* 375 */ field$12 = currentAggBuffer$8.getInt(0);
> /* 376 */ }
> /* 377 */ // do aggregate and update agg buffer
> /* 378 */ int result$16 = -1;
> /* 379 */ boolean isNull$16;
> /* 380 */ if (isNull$11) {
> /* 381 */
> /* 382 */ isNull$16 = isNull$12;
> /* 383 */ if (!isNull$16) {
> /* 384 */ result$16 = field$12;
> /* 385 */ }
> /* 386 */ }
> /* 387 */ else {
> /* 388 */ int result$15 = -1;
> /* 389 */ boolean isNull$15;
> /* 390 */ if (isNull$12) {
> /* 391 */
> /* 392 */ isNull$15 = isNull$11;
> /* 393 */ if (!isNull$15) {
> /* 394 */ result$15 = field$11;
> /* 395 */ }
> /* 396 */ }
> /* 397 */ else {
> /* 398 */
> /* 399 */
> /* 400 */
> /* 401 */ isNull$13 = isNull$12 || isNull$11;
> /* 402 */ result$14 = -1;
> /* 403 */ if (!isNull$13) {
> /* 404 */
> /* 405 */ result$14 = (int) (field$12 + field$11);
> /* 406 */
> /* 407 */ }
> /* 408 */
> /* 409 */ isNull$15 = isNull$13;
> /* 410 */ if (!isNull$15) {
> /* 411 */ result$15 = result$14;
> /* 412 */ }
> /* 413 */ }
> /* 414 */ isNull$16 = isNull$15;
> /* 415 */ if (!isNull$16) {
> /* 416 */ result$16 = result$15;
> /* 417 */ }
> /* 418 */ }
> /* 419 */ if (isNull$16) {
> /* 420 */ currentAggBuffer$8.setNullAt(0);
> /* 421 */ } else {
> /* 422 */ currentAggBuffer$8.setInt(0, result$16);
> /* 423 */ }
> /* 424 */
> /* 425 */ }
> /* 426 */ }
> /* 427 */
> /* 428 */
> /* 429 */ @Override
> /* 430 */ public void endInput() throws Exception {
> /* 431 */ org.apache.flink.table.data.binary.BinaryRowData
> currentAggBuffer$8;
> /* 432 */ int field$11;
> /* 433 */ boolean isNull$11;
> /* 434 */ int field$12;
> /* 435 */ boolean isNull$12;
> /* 436 */ boolean isNull$13;
> /* 437 */ int result$14;
> /* 438 */
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.LookupInfo
> lookupInfo$20;
> /* 439 */ org.apache.flink.table.data.TimestampData field$21;
> /* 440 */ boolean isNull$21;
> /* 441 */ boolean isNull$22;
> /* 442 */ long result$23;
> /* 443 */ boolean isNull$24;
> /* 444 */ long result$25;
> /* 445 */ boolean isNull$26;
> /* 446 */ long result$27;
> /* 447 */ boolean isNull$28;
> /* 448 */ long result$29;
> /* 449 */ boolean isNull$30;
> /* 450 */ long result$31;
> /* 451 */ boolean isNull$32;
> /* 452 */ long result$33;
> /* 453 */ boolean isNull$34;
> /* 454 */ boolean result$35;
> /* 455 */ boolean isNull$36;
> /* 456 */ long result$37;
> /* 457 */ boolean isNull$38;
> /* 458 */ long result$39;
> /* 459 */ boolean isNull$40;
> /* 460 */ long result$41;
> /* 461 */ boolean isNull$42;
> /* 462 */ long result$43;
> /* 463 */ boolean isNull$44;
> /* 464 */ long result$45;
> /* 465 */ boolean isNull$46;
> /* 466 */ long result$47;
> /* 467 */ boolean isNull$48;
> /* 468 */ long result$49;
> /* 469 */ boolean isNull$50;
> /* 470 */ long result$51;
> /* 471 */ boolean isNull$52;
> /* 472 */ long result$53;
> /* 473 */ boolean isNull$55;
> /* 474 */ long result$56;
> /* 475 */ boolean isNull$57;
> /* 476 */ long result$58;
> /* 477 */
> /* 478 */
> org.apache.flink.util.MutableObjectIterator<org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry>
> iterator =
> /* 479 */ aggregateMap$7.getEntryIterator();
> /* 480 */ while (iterator.next(reuseAggMapEntry$19) != null) {
> /* 481 */
> /* 482 */
> /* 483 */
> /* 484 */ hashAggOutput.replace(reuseAggMapKey$17, reuseAggBuffer$18);
> /* 485 */
> /* 486 */ output.collect(outElement.replace(hashAggOutput));
> /* 487 */ }
> /* 488 */
> /* 489 */ }
> /* 490 */
> /* 491 */
> /* 492 */ @Override
> /* 493 */ public void close() throws Exception {
> /* 494 */ super.close();
> /* 495 */ aggregateMap$7.free();
> /* 496 */
> /* 497 */ }
> /* 498 */
> /* 499 */
> /* 500 */ }
> /* 501 */
> Traceback (most recent call last):
> File "/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_903.py",
> line 32, in <module>
> print(table.to_pandas())
> File "/home/alex/work/flink/flink-python/pyflink/table/table.py", line 829,
> in to_pandas
> if batches.hasNext():
> File
> "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py",
> line 1285, in __call__
> return_value = get_return_value(
> File "/home/alex/work/flink/flink-python/pyflink/util/exceptions.py", line
> 147, in deco
> return f(*a, **kw)
> File
> "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/protocol.py",
> line 326, in get_return_value
> raise Py4JJavaError(
> py4j.protocol.Py4JJavaError: An error occurred while calling o51.hasNext.
> : java.lang.RuntimeException: Failed to fetch next result
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
> at
> org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115)
> at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
> at
> org.apache.flink.table.runtime.arrow.ArrowUtils$1.hasNext(ArrowUtils.java:644)
> at
> org.apache.flink.table.runtime.arrow.ArrowUtils$2.hasNext(ArrowUtils.java:666)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.io.IOException: Failed to fetch job execution result
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103)
> ... 16 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> at
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:172)
> ... 18 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
> at
> java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
> at
> java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
> at
> java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117)
> ... 19 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:217)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:210)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:204)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:526)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:413)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not instantiate generated class
> 'LocalHashWinAggWithoutKeys$59'
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
> at
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:613)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:574)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be
> compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
> ... 13 more
> Caused by:
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be
> compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> ... 15 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program
> cannot be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> ... 18 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 351, Column
> 33: A method named "replace" is not declared in any enclosing class nor any
> supertype, nor through a static import
> at
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
> at
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> at
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
> at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$Block.accept(Java.java:2776)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1842)
> at org.codehaus.janino.UnitCompiler.access$2200(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitWhileStatement(UnitCompiler.java:1498)
> at
> org.codehaus.janino.UnitCompiler$6.visitWhileStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$WhileStatement.accept(Java.java:3052)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
> at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$Block.accept(Java.java:2776)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileTryCatch(UnitCompiler.java:3136)
> at
> org.codehaus.janino.UnitCompiler.compileTryCatchFinally(UnitCompiler.java:2966)
> at
> org.codehaus.janino.UnitCompiler.compileTryCatchFinallyWithResources(UnitCompiler.java:2770)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2742)
> at org.codehaus.janino.UnitCompiler.access$2300(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitTryStatement(UnitCompiler.java:1499)
> at
> org.codehaus.janino.UnitCompiler$6.visitTryStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$TryStatement.accept(Java.java:3238)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
> at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$Block.accept(Java.java:2776)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2476)
> at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
> at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
> at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$Block.accept(Java.java:2776)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2476)
> at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
> at
> org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> ... 24 more
> {code}
> However it works fine in streaming mode:
> {code:python}
> env_settings = (
>
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> )
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
> {code}
> How the table is created seems irrelevant - this raises the same error:
> {code:python}
> from datetime import datetime
> from pyflink.table import DataTypes, BatchTableEnvironment,
> EnvironmentSettings
> from pyflink.table.window import Tumble
> env_settings = (
>
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
> )
> table_environment =
> BatchTableEnvironment.create(environment_settings=env_settings)
> transactions = table_environment.from_elements(
> [
> (1, datetime(2000, 1, 1, 0, 0, 0)),
> (-2, datetime(2000, 1, 2, 0, 0, 0)),
> (3, datetime(2000, 1, 3, 0, 0, 0)),
> (-4, datetime(2000, 1, 4, 0, 0, 0)),
> ],
> DataTypes.ROW(
> [
> DataTypes.FIELD("amount", DataTypes.BIGINT()),
> DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
> ]
> ),
> )
> table = (
> transactions
> .window(Tumble.over("5.days").on("ts").alias("__window"))
> .group_by("__window")
> .select("amount.sum")
> )
> print(table.to_pandas())
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)