[
https://issues.apache.org/jira/browse/FLINK-24885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
wangbaohua updated FLINK-24885:
-------------------------------
Priority: Blocker (was: Major)
> ProcessElement Interface parameter Collector : java.lang.NullPointerException
> ------------------------------------------------------------------------------
>
> Key: FLINK-24885
> URL: https://issues.apache.org/jira/browse/FLINK-24885
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.13.1
> Reporter: wangbaohua
> Priority: Blocker
>
> 2021-11-12 17:11:44,024 WARN org.apache.flink.runtime.taskmanager.Task
> [] - Co-Process-Broadcast-Keyed -> Map ->
> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_8,
> type=*com.asap.demo.model.BeanField<`account` STRING, `accountId` STRING,
> `accountIn` STRING, `accountInName` STRING, `accountInOrgId` STRING,
> `accountInOrgName` STRING, `accountInType` STRING, `accountName` STRING,
> `accountOrgId` STRING, `accountOrgName` STRING, `accountOut` STRING,
> `accountOutName` STRING, `accountOutOrgId` STRING, `accountOutOrgName`
> STRING, `accountOutType` STRING, `accountStatus` STRING, `accountType`
> STRING, `action` STRING, `actionDesc` STRING, `alarmcontext` STRING,
> `alarmgrade` STRING, `alarmtype` STRING, `alertId` STRING, `alertInfo`
> STRING, `alertLevel` STRING, `alertSignatureIdL` STRING, `appId` STRING,
> `appName` STRING, `appProtocol` STRING, `appType` STRING, `areaId` STRING,
> `areaName` STRING, `areaType` STRING, `assetFrom` STRING, `assetId` STRING,
> `assetInfo` STRING, `assetIp` STRING, `assetLevel` STRING, `assetName`
> STRING, `assetPid` STRING, `assetType` STRING, `assetUse` STRING,
> `assetVendor` STRING, `attackStage` STRING, `attackStageCode` STRING,
> `attackType` STRING, `attackTypeName` STRING, `authSerNum` STRING, `authTime`
> STRING, `authType` STRING, `bankSeqNum` STRING, `batchNo` STRING,
> `blackDomain` STRING, `blackDomainDesc` STRING, `blackDomainTag` STRING,
> `blackDstIp` STRING, `blackFile` STRING, `blackFileDesc` STRING,
> `blackFileTag` STRING, `blackId` STRING, `blackIpTag` STRING, `blackSrcIp`
> STRING, `blackTag` STRING, `blackTagMatchCount` STRING, `blackTagMatchDesc`
> STRING, `blackUrl` STRING, `blackUrlDesc` STRING, `blackUrlTag` STRING,
> `blackVulnCve` STRING, `blackVulnDesc` STRING, `blackVulnName` STRING,
> `blackVulnTag` STRING, `branchId` STRING, `branchName` STRING,
> `businessSystemName` STRING, `businessType` STRING, `cardId` STRING,
> `cascadeSourceIp` STRING, `cascadeSourceName` STRING, `cebUid` STRING,
> `certNum` STRING, `certType` STRING, `chainId` STRING, `channel` STRING,
> `channelId` STRING, `character` STRING, `charge` STRING, `cifSeqNum` STRING,
> `clientInfo` STRING, `clientIp` STRING, `clientMac` STRING, `clientName`
> STRING, `clientPort` STRING, `collectTime` TIMESTAMP_LTZ(9), `collectTimeL`
> TIMESTAMP_LTZ(9), `command` STRING, `commandLine` STRING, `commandResult`
> STRING, `company` STRING, `companyCustomId` STRING, `companyId` STRING,
> `completenessTag` STRING, `confidence` STRING, `confidenceLevel` STRING,
> `consignedUser` STRING, `contractNo` STRING, `count` STRING, `couponAmount`
> STRING, `couponId` STRING, `createTime` TIMESTAMP_LTZ(3), `createTimeL`
> BIGINT, `createdBy` STRING, `curType` STRING, `currency` STRING, `currentBal`
> STRING, `customLabel1` STRING, `customLabel10` STRING, `customLabel2` STRING,
> `customLabel3` STRING, `customLabel4` STRING, `customLabel5` STRING,
> `customLabel6` STRING, `customLabel7` STRING, `customLabel8` STRING,
> `customLabel9` STRING, `customValue1` STRING, `customValue10` STRING,
> `customValue2` STRING, `customValue3` STRING, `customValue4` STRING,
> `customValue5` STRING, `customValue6` STRING, `customValue7` STRING,
> `customValue8` STRING, `customValue9` STRING, `dataQualityTag` STRING,
> `dataType` STRING, `dataTypeName` STRING, `dbInstance` STRING, `dbName`
> STRING, `dbTable` STRING, `dbVersion` STRING, `dealSuggest` STRING,
> `defVManagerId` STRING, `department` STRING, `deviceCategory` STRING,
> `deviceId` STRING, `deviceIp` STRING, `deviceMac` STRING, `deviceName`
> STRING, `deviceParentType` STRING, `deviceType` STRING, `deviceVersion`
> STRING, `direction` STRING, `directionDesc` STRING, `directionOfAttackTag`
> STRING, `domain` STRING, `dstAdminAccount` STRING, `dstAdminEmail` STRING,
> `dstAdminFOrgId` STRING, `dstAdminId` STRING, `dstAdminMobile` STRING,
> `dstAdminName` STRING, `dstAdminOrgId` STRING, `dstAdminOrgName` STRING,
> `dstAdminType` STRING, `dstAsset` STRING, `dstAssetId` STRING, `dstAssetInfo`
> STRING, `dstAssetKey` STRING, `dstAssetLevel` STRING, `dstAssetModel` STRING,
> `dstAssetName` STRING, `dstAssetPid` STRING, `dstAssetStatus` STRING,
> `dstAssetSubType` STRING, `dstAssetType` STRING, `dstAssetVendor` STRING,
> `dstBizId` STRING, `dstCity` STRING, `dstCompany` STRING, `dstCountry`
> STRING, `dstDbInstance` STRING, `dstDomainName` STRING, `dstFGroupId` STRING,
> `dstGroupId` STRING, `dstGroupName` STRING, `dstHostName` STRING,
> `dstIndustry` STRING, `dstIntelDesc` STRING, `dstIntelId` STRING,
> `dstIntelType` STRING, `dstInterface` STRING, `dstIp` STRING, `dstIpL`
> STRING, `dstLatitude` STRING, `dstLongitude` STRING, `dstMac` STRING,
> `dstManagerIp` STRING, `dstNatIp` STRING, `dstNatPort` STRING, `dstOperator`
> STRING, `dstOrgAdmin` STRING, `dstOrgId` STRING, `dstOrgName` STRING,
> `dstOsId` STRING, `dstPort` STRING, `dstPost` STRING, `dstProvince` STRING,
> `dstService` STRING, `dstSubDomainName` STRING, `dstUser` STRING, `dstZone`
> STRING, `duration` STRING, `empNum` STRING, `endTime` TIMESTAMP_LTZ(9),
> `engineName` STRING, `entryTime` TIMESTAMP_LTZ(9), `errorCode` STRING,
> `eventAppendix` STRING, `eventCount` STRING, `eventId` STRING, `eventIp`
> STRING, `eventName` STRING, `eventOneType` STRING, `eventOneTypeDesc` STRING,
> `eventOneTypeName` STRING, `eventParentType` STRING, `eventThreeType` STRING,
> `eventThreeTypeDesc` STRING, `eventThreeTypeName` STRING, `eventTwoType`
> STRING, `eventTwoTypeDesc` STRING, `eventTwoTypeName` STRING, `eventType`
> STRING, `fileHash` STRING, `fileName` STRING, `filePath` STRING, `fileSize`
> STRING, `fileType` STRING, `flag` STRING, `flow` STRING, `flowAvg` STRING,
> `flowDiscard` STRING, `flowDown` STRING, `flowMax` STRING, `flowNum` STRING,
> `flowUp` STRING, `groupId` STRING, `groupName` STRING, `id` STRING, `idCard`
> STRING, `indexTag` STRING, `infectionDstIp` STRING, `infectionDstName`
> STRING, `infectionFile` STRING, `infectionIp` STRING, `infectionSrcIp`
> STRING, `infectionSrcName` STRING, `installNum` STRING, `instance` STRING,
> `interestedIp` STRING, `intranetInternetTag` STRING, `ipType` STRING,
> `isBack` STRING, `jobTitle` STRING, `languageSign` STRING, `lastLoginTime`
> TIMESTAMP_LTZ(9), `lastUpdBy` STRING, `lastUpdTime` TIMESTAMP_LTZ(9),
> `latnId` STRING, `length` STRING, `location` STRING, `lockDesc` STRING,
> `lockTime` TIMESTAMP_LTZ(9), `logStatus` STRING, `logSubType` STRING,
> `logType` STRING, `loginTime` TIMESTAMP_LTZ(9), `loginType` STRING,
> `loginWay` STRING, `mailAdd` STRING, `mailIn` STRING, `mailOut` STRING,
> `mailRecipient` STRING, `mailSender` STRING, `mailSubject` STRING,
> `mailTotal` STRING, `mailType` STRING, `mainAccount` STRING,
> `mainAccountCreateTime` TIMESTAMP_LTZ(9), `mainAccountCreateUser` STRING,
> `mainAccountDesc` STRING, `mainAccountId` STRING, `mainAccountInvalidTime`
> TIMESTAMP_LTZ(9), `mainAccountLoginDateLast` STRING,
> `mainAccountLoginFailCount` STRING, `mainAccountModifyPwdTime`
> TIMESTAMP_LTZ(9), `mainAccountModifyTime` TIMESTAMP_LTZ(9),
> `mainAccountStatus` STRING, `mainAccountType` STRING, `mainAccountValidTime`
> TIMESTAMP_LTZ(9), `malwareName` STRING, `malwareSubType` STRING,
> `malwareType` STRING, `managerId` STRING, `managerIp` STRING, `managerTypeId`
> STRING, `menuDesc` STRING, `menuId` STRING, `menuName` STRING, `menuPid`
> STRING, `menuStatus` STRING, `menuType` STRING, `merchantId` STRING,
> `merchantName` STRING, `message` STRING, `method` STRING, `missingField`
> STRING, `model` STRING, `module` STRING, `moduleId` STRING, `name` STRING,
> `networkType` STRING, `newValue` STRING, `nextFlowNum` STRING, `object`
> STRING, `oldIdCard` STRING, `oldValue` STRING, `openid` STRING, `operType`
> STRING, `operTypeName` STRING, `ordSerNum` STRING, `order` STRING, `orderNo`
> STRING, `orderType` STRING, `orgCode` STRING, `orgId` STRING, `orgName`
> STRING, `orgNameLevel` STRING, `orgNamePath` STRING, `osId` STRING, `osName`
> STRING, `osVersion` STRING, `parentGroupId` STRING, `parentOrgId` STRING,
> `parentOrgName` STRING, `parentOrgNamePath` STRING, `passUpdateTime` STRING,
> `password` STRING, `payItemId` STRING, `payItemName` STRING, `payTime`
> TIMESTAMP_LTZ(9), `payUnitName` STRING, `payUnitType` STRING, `personId`
> STRING, `personName` STRING, `phone` STRING, `phoneImer` STRING, `phs`
> STRING, `policyId` STRING, `policyInfo` STRING, `policyName` STRING,
> `position` STRING, `priority` STRING, `profession` STRING, `professionName`
> STRING, `protocol` STRING, `provinceFromId` STRING, `provinceFromName`
> STRING, `rate` STRING, `rawMsg` STRING, `realFee` STRING, `reason` STRING,
> `receFee` STRING, `recvPacket` STRING, `recvSize` STRING, `refundOrderNo`
> STRING, `refundOrderTime` TIMESTAMP_LTZ(9), `registerNum` STRING,
> `registerRate` STRING, `rejCode` STRING, `relateAccount` STRING,
> `relateAccountId` STRING, `relateAccountName` STRING, `remark` STRING,
> `requestMessage` STRING, `requestNo` STRING, `requestTime` TIMESTAMP_LTZ(9),
> `responseCode` STRING, `responseIp` STRING, `responseMessage` STRING,
> `result` STRING, `resultCode` STRING, `resultDesc` STRING, `retain` STRING,
> `riskLevel` STRING, `riskLevelDesc` STRING, `roleId` STRING, `roleName`
> STRING, `ruleId` STRING, `ruleName` STRING, `ruleTjCount` STRING,
> `safetyMargin` STRING, `sceneId` STRING, `sceneOneType` STRING,
> `sceneThreeType` STRING, `sceneTwoType` STRING, `sendPacket` STRING,
> `sendSize` STRING, `serialNum` STRING, `serverIp` STRING, `serverName`
> STRING, `serverPort` STRING, `service` STRING, `serviceTime`
> TIMESTAMP_LTZ(9), `sessionCount` BIGINT, `sessionId` STRING, `settleMethod`
> STRING, `sex` STRING, `shareFlag` STRING, `sid` STRING, `signData` STRING,
> `snowId` STRING, `softwareInfo` STRING, `source` STRING, `srcAdminAccount`
> STRING, `srcAdminEmail` STRING, `srcAdminFOrgId` STRING, `srcAdminId` STRING,
> `srcAdminMobile` STRING, `srcAdminName` STRING, `srcAdminOrgId` STRING,
> `srcAdminOrgName` STRING, `srcAdminType` STRING, `srcAsset` STRING,
> `srcAssetId` STRING, `srcAssetInfo` STRING, `srcAssetKey` STRING,
> `srcAssetLevel` STRING, `srcAssetModel` STRING, `srcAssetName` STRING,
> `srcAssetPid` STRING, `srcAssetStatus` STRING, `srcAssetSubType` STRING,
> `srcAssetType` STRING, `srcAssetVendor` STRING, `srcBizId` STRING, `srcCity`
> STRING, `srcCompany` STRING, `srcContnent` STRING, `srcCountry` STRING,
> `srcDbInstance` STRING, `srcDomainName` STRING, `srcFGroupId` STRING,
> `srcGroupId` STRING, `srcGroupName` STRING, `srcHostName` STRING,
> `srcIndustry` STRING, `srcIntelDesc` STRING, `srcIntelId` STRING,
> `srcIntelType` STRING, `srcInterface` STRING, `srcIp` STRING, `srcIpL`
> STRING, `srcLatitude` STRING, `srcLongitude` STRING, `srcMac` STRING,
> `srcManagerIp` STRING, `srcNatIp` STRING, `srcNatPort` STRING, `srcOperator`
> STRING, `srcOrgAdmin` STRING, `srcOrgId` STRING, `srcOrgName` STRING,
> `srcOsId` STRING, `srcPort` STRING, `srcPost` STRING, `srcProvince` STRING,
> `srcService` STRING, `srcSubDomainName` STRING, `srcUser` STRING, `srcZone`
> STRING, `staffCode` STRING, `staffCrm` STRING, `staffName` STRING,
> `staffState` STRING, `startTime` TIMESTAMP_LTZ(9), `status` STRING,
> `subAccount` STRING, `subAccountCreateTime` TIMESTAMP_LTZ(9),
> `subAccountCreateUser` STRING, `subAccountDesc` STRING, `subAccountId`
> STRING, `subAccountInvalidTime` TIMESTAMP_LTZ(9), `subAccountLoginDateLast`
> STRING, `subAccountLoginFailCount` STRING, `subAccountModifyPwdTime`
> TIMESTAMP_LTZ(9), `subAccountModifyTime` TIMESTAMP_LTZ(9), `subAccountStatus`
> STRING, `subAccountType` STRING, `subAccountValidTime` TIMESTAMP_LTZ(9),
> `sumAreaId` STRING, `sumManagerId` STRING, `tag` STRING, `taskId` STRING,
> `taskName` STRING, `telephone` STRING, `telephoneType` STRING, `tenantId`
> STRING, `tenantName` STRING, `terminalNum` STRING, `threatName` STRING,
> `threatType` STRING, `threatTypeDesc` STRING, `transBal` STRING,
> `transChannel` STRING, `transCode` STRING, `transId` STRING, `transName`
> STRING, `transStatus` STRING, `transTime` TIMESTAMP_LTZ(9), `transType`
> STRING, `type` STRING, `unitName` STRING, `updateTime` TIMESTAMP_LTZ(9),
> `upmpQn` STRING, `upmpSerialNum` STRING, `url` STRING, `user` STRING,
> `userGroupId` STRING, `userGroupName` STRING, `userId` STRING, `userOrgId`
> STRING, `userOrgName` STRING, `userType` STRING, `uuId` STRING, `value`
> STRING, `version` STRING, `voidOrderNo` STRING, `vulnId` STRING, `vulnInfo`
> STRING, `vulnLevel` STRING, `vulnName` STRING, `vulnType` STRING, `weixinId`
> STRING, `weixinVersion` STRING, `wpTag` STRING, `writeOffTime`
> TIMESTAMP_LTZ(9)>*, rowtime=false, watermark=true) ->
> Calc(select=[eventTwoType, deviceParentType, type, eventName, directionDesc,
> srcIp, dstIp, createTime, snowId]) -> SinkConversionToTuple2 -> Sink: Print
> to Std. Out (1/1)#113 (da3617016729d3dc56e6ed9de9f2d4e2) switched from
> RUNNING to FAILED with failure cause: java.lang.NullPointerException
> at SinkConversion$22.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at StreamExecCalc$18.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
> at com.asap.demo.rete.ReteDemo4$1.match(ReteDemo4.java:242)
> at com.asap.demo.rete.ReteDemo4$1.processElement(ReteDemo4.java:223)
> at com.asap.demo.rete.ReteDemo4$1.processElement(ReteDemo4.java:148)
> at
> org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:125)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:213)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:178)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at java.lang.Thread.run(Thread.java:748)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)